You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/09/27 15:11:10 UTC

[2/5] incubator-rya git commit: RYA-151 Rya Query benchmark tool implemented using JMH.

RYA-151 Rya Query benchmark tool implemented using JMH.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e77e839d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e77e839d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e77e839d

Branch: refs/heads/master
Commit: e77e839d76e140bee47eb610cf19c453ef0a79b5
Parents: 62c0794
Author: Kevin Chilton <ke...@parsons.com>
Authored: Wed Aug 17 22:29:41 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Sep 27 11:05:06 2016 -0400

----------------------------------------------------------------------
 .../java/mvm/rya/api/client/BatchUpdatePCJ.java |  22 ++
 .../api/client/PCJDoesNotExistException.java    |  20 ++
 .../main/java/mvm/rya/api/client/RyaClient.java |  13 +-
 .../api/instance/RyaDetailsToConfiguration.java |  72 ++--
 .../java/mvm/rya/accumulo/AccumuloITBase.java   |   8 +-
 .../client/accumulo/AccumuloBatchUpdatePCJ.java | 226 +++++++++++++
 .../api/client/accumulo/AccumuloCreatePCJ.java  |  64 ++--
 .../accumulo/AccumuloRyaClientFactory.java      |   1 +
 .../PrecomputedJoinStorageSupplier.java         |   3 +-
 .../accumulo/AccumuloBatchUpdatePCJIT.java      | 135 ++++++++
 extras/pom.xml                                  |   7 +-
 extras/rya.benchmark/pom.xml                    | 250 ++++++++++++++
 .../query/QueriesBenchmarkConfReader.java       |  83 +++++
 .../rya/benchmark/query/QueryBenchmark.java     | 336 +++++++++++++++++++
 .../src/main/resources/LICENSE.txt              |  16 +
 .../src/main/xsd/queries-benchmark-conf.xsd     |  74 ++++
 .../query/QueriesBenchmarkConfReaderIT.java     | 105 ++++++
 .../benchmark/query/QueryBenchmarkRunIT.java    | 196 +++++++++++
 extras/rya.console/pom.xml                      |  17 +-
 .../java/mvm/rya/shell/RyaAdminCommands.java    |  72 ++--
 .../java/mvm/rya/shell/util/InstallPrompt.java  |  16 +-
 .../mvm/rya/shell/RyaAdminCommandsTest.java     |  37 +-
 22 files changed, 1672 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
new file mode 100644
index 0000000..20d90e0
--- /dev/null
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
@@ -0,0 +1,22 @@
+package mvm.rya.api.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * Batch update a PCJ index.
+ */
+@ParametersAreNonnullByDefault
+public interface BatchUpdatePCJ {
+
+    /**
+     * Batch update a specific PCJ index using the {@link Statement}s that are
+     * currently in the Rya instance.
+     *
+     * @param ryaInstanceName - The Rya instance whose PCJ will be updated. (not null)
+     * @param pcjId - Identifies the PCJ index to update. (not null)
+     * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+     * @throws PCJDoesNotExistException No PCJ exists for the provided PCJ ID.
+     * @throws RyaClientException Something caused the command to fail.
+     */
+    public void batchUpdate(String ryaInstanceName, String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
new file mode 100644
index 0000000..63efe0c
--- /dev/null
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
@@ -0,0 +1,20 @@
+package mvm.rya.api.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * One of the {@link RyaClient} commands could not execute because the connected
+ * instance of Rya does not have a PCJ matching the provided PCJ ID.
+ */
+@ParametersAreNonnullByDefault
+public class PCJDoesNotExistException extends RyaClientException {
+    private static final long serialVersionUID = 1L;
+
+    public PCJDoesNotExistException(final String message) {
+        super(message);
+    }
+
+    public PCJDoesNotExistException(final String message, final Throwable cause) {
+        super(message, cause);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
index 173e1e0..851a273 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
@@ -33,23 +33,26 @@ public class RyaClient {
     private final Install install;
     private final CreatePCJ createPcj;
     private final DeletePCJ deletePcj;
+    private final BatchUpdatePCJ batchUpdatePcj;
     private final GetInstanceDetails getInstanceDetails;
     private final InstanceExists instanceExists;
     private final ListInstances listInstances;
 
     /**
-     * Constructs an isntance of {@link RyaClient}.
+     * Constructs an instance of {@link RyaClient}.
      */
     public RyaClient(
             final Install install,
             final CreatePCJ createPcj,
             final DeletePCJ deletePcj,
+            final BatchUpdatePCJ batchUpdatePcj,
             final GetInstanceDetails getInstanceDetails,
             final InstanceExists instanceExists,
             final ListInstances listInstances) {
         this.install = requireNonNull(install);
         this.createPcj = requireNonNull(createPcj);
         this.deletePcj = requireNonNull(deletePcj);
+        this.batchUpdatePcj = requireNonNull(batchUpdatePcj);
         this.getInstanceDetails = requireNonNull(getInstanceDetails);
         this.instanceExists = requireNonNull(instanceExists);
         this.listInstances = requireNonNull(listInstances);
@@ -79,6 +82,14 @@ public class RyaClient {
     }
 
     /**
+     * @return An instnace of {@link BatchUpdatePCJ} that is connected to a Rya storage
+     *   if the Rya instance supports PCJ indexing.
+     */
+    public BatchUpdatePCJ getBatchUpdatePCJ() {
+        return batchUpdatePcj;
+    }
+
+    /**
      * @return An instance of {@link GetInstanceDetails} that is connected to a Rya storage.
      */
     public GetInstanceDetails getGetInstanceDetails() {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
index faec0ff..8734adc 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
@@ -18,29 +18,34 @@
  */
 package mvm.rya.api.instance;
 
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 
 /**
  * Used to fetch {@link RyaDetails} from a {@link RyaDetailsRepository} and
  * add them to the application's {@link Configuration}.
  */
+@ParametersAreNonnullByDefault
 public class RyaDetailsToConfiguration {
-    private static final Logger LOG = Logger.getLogger(RyaDetailsToConfiguration.class);
+    private static final Logger log = Logger.getLogger(RyaDetailsToConfiguration.class);
+
     /**
      * Ensures the values in the {@link Configuration} do not conflict with the values in {@link RyaDetails}.
      * If they do, the values in {@link RyaDetails} take precedent and the {@link Configuration} value will
      * be overwritten.
      *
-     * @param details - The {@link RyaDetails} to add to the {@link Configuration}.
-     * @param conf - The {@link Configuration} to add {@link RyaDetails} to.
+     * @param details - The {@link RyaDetails} to add to the {@link Configuration}. (not null)
+     * @param conf - The {@link Configuration} to add {@link RyaDetails} to. (not null)
      */
     public static void addRyaDetailsToConfiguration(final RyaDetails details, final Configuration conf) {
-        Preconditions.checkNotNull(details);
-        Preconditions.checkNotNull(conf);
+        requireNonNull(details);
+        requireNonNull(conf);
 
         checkAndSet(conf, ConfigurationFields.USE_ENTITY, details.getEntityCentricIndexDetails().isEnabled());
         checkAndSet(conf, ConfigurationFields.USE_FREETEXT, details.getFreeTextIndexDetails().isEnabled());
@@ -50,23 +55,44 @@ public class RyaDetailsToConfiguration {
     }
 
     /**
-     * Checks to see if the configuration has a value in the specified field.
-     * If the value exists and does not match what is expected by the {@link RyaDetails},
-     * an error will be logged and the value will be overwritten.
-     * @param conf - The {@link Configuration} to potentially change.
-     * @param field - The field to check and set.
-     * @param value - The new value in the field (is not used if the value doesn't need to be changed).
+     * Ensures a Rya Client will not try to use a secondary index that is not not supported by the Rya Instance
+     * it is connecting to.
+     * </p>
+     * If the configuration...
+     * <ul>
+     *   <li>provides an 'on' value for an index that is supported, then nothing changes.</li>
+     *   <li>provides an 'off' value for an index that is or is not supported, then nothing changes.</li>
+     *   <li>provides an 'on' value for an index that is not supported, then the index is turned
+     *       off and a warning is logged.</li>
+     *   <li>does not provide any value for an index, then it will be turned on if supported.</li>
+     * </ul>
+     *
+     * @param conf - The {@link Configuration} to potentially change. (not null)
+     * @param useIndexField - The field within {@code conf} that indicates if the client will utilize the index. (not null)
+     * @param indexSupported - {@code true} if the Rya Instance supports the index; otherwise {@code false}.
      */
-    private static void checkAndSet(final Configuration conf, final String field, final boolean value) {
-        final Optional<String> opt = Optional.fromNullable(conf.get(field));
-        if(opt.isPresent()) {
-            final Boolean curVal = new Boolean(opt.get());
-            if(curVal != value) {
-                LOG.error("The user configured value in: " + field + " will be overwritten by what has been configured by the admin.");
-                conf.setBoolean(field, value);
-            }
-        } else {
-            conf.setBoolean(field, value);
+    private static void checkAndSet(final Configuration conf, final String useIndexField, final boolean indexSupported) {
+        requireNonNull(conf);
+        requireNonNull(useIndexField);
+
+        final Optional<String> useIndexStr = Optional.fromNullable( conf.get(useIndexField) );
+
+        // If no value was provided, default to using the index if it is supported.
+        if(!useIndexStr.isPresent()) {
+            log.info("No Rya Client configuration was provided for the " + useIndexField +
+                    " index, so it is being defaulted to " + indexSupported);
+            conf.setBoolean(useIndexField, indexSupported);
+            return;
+        }
+
+        // If configured to use the index, but the Rya Instance does not support it, then turn it off.
+        final boolean useIndex = Boolean.parseBoolean( useIndexStr.get() );
+        if(useIndex && !indexSupported) {
+            log.warn("The Rya Client indicates it wants to use a secondary index that the Rya Instance does not support. " +
+                    "This is not allowed, so the index will be turned off. Index Configuration Field: " + useIndexField);
+            conf.setBoolean(useIndexField, false);
         }
+
+        // Otherwise use whatever the Client wants to use.
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
index 2a1c384..7dd23e6 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
@@ -95,11 +95,9 @@ public class AccumuloITBase {
     }
 
     /**
-     * TODO doc
-     *
-     * @return
-     * @throws AccumuloSecurityException
-     * @throws AccumuloException
+     * @return A {@link Connector} that creates connections to the mini accumulo cluster.
+     * @throws AccumuloException Could not connect to the cluster.
+     * @throws AccumuloSecurityException Could not connect to the cluster because of a security violation.
      */
     public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
         return cluster.getConnector();

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
new file mode 100644
index 0000000..ee773b0
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -0,0 +1,226 @@
+package mvm.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import mvm.rya.api.client.BatchUpdatePCJ;
+import mvm.rya.api.client.InstanceDoesNotExistException;
+import mvm.rya.api.client.PCJDoesNotExistException;
+import mvm.rya.api.client.RyaClientException;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import mvm.rya.api.instance.RyaDetailsUpdater;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Uses an in memory Rya Client to batch update a PCJ index.
+ */
+public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpdatePCJ {
+
+    private static final Logger log = Logger.getLogger(AccumuloBatchUpdatePCJ.class);
+
+    /**
+     * Constructs an instance of {@link AccumuloBatchUpdatePCJ}.
+     *
+     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+     */
+    public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+        super(connectionDetails, connector);
+    }
+
+    @Override
+    public void batchUpdate(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+        requireNonNull(ryaInstanceName);
+        requireNonNull(pcjId);
+        verifyPCJState(ryaInstanceName, pcjId);
+        updatePCJResults(ryaInstanceName, pcjId);
+        updatePCJMetadata(ryaInstanceName, pcjId);
+    }
+
+    private void verifyPCJState(final String ryaInstanceName, final String pcjId) throws RyaClientException {
+        try {
+            // Fetch the Rya instance's details.
+            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+            final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails();
+
+            // Ensure PCJs are enabled.
+            if(!ryaDetails.getPCJIndexDetails().isEnabled()) {
+                throw new RyaClientException("PCJs are not enabled for the Rya instance named '" + ryaInstanceName + "'.");
+            }
+
+            // Ensure the PCJ exists.
+            if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) {
+                throw new PCJDoesNotExistException("The PCJ with id '" + pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'.");
+            }
+
+            // Ensure the PCJ is not already being incrementally updated.
+            final PCJDetails pcjDetails = ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+            final Optional<PCJUpdateStrategy> updateStrategy = pcjDetails.getUpdateStrategy();
+            if(updateStrategy.isPresent() && updateStrategy.get() == PCJUpdateStrategy.INCREMENTAL) {
+                throw new RyaClientException("The PCJ with id '" + pcjId + "' is already being updated incrementally.");
+            }
+        } catch(final NotInitializedException e) {
+            throw new InstanceDoesNotExistException("No RyaDetails are initialized for the Rya instance named '" + ryaInstanceName + "'.", e);
+        } catch (final RyaDetailsRepositoryException e) {
+            throw new RyaClientException("Could not fetch the RyaDetails for the Rya instance named '" + ryaInstanceName + "'.", e);
+        }
+    }
+
+    private void updatePCJResults(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+        // Things that have to be closed before we exit.
+        Sail sail = null;
+        SailConnection sailConn = null;
+        CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null;
+
+        try {
+            // Create an instance of Sail backed by the Rya instance.
+            sail = connectToRya(ryaInstanceName);
+
+            // Purge the old results from the PCJ.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName);
+            try {
+                pcjStorage.purge(pcjId);
+            } catch (final PCJStorageException e) {
+                throw new RyaClientException("Could not batch update PCJ with ID '" + pcjId + "' because the old " +
+                        "results could not be purged from it.", e);
+            }
+
+            try {
+                // Parse the PCJ's SPARQL query.
+                final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+                final String sparql = metadata.getSparql();
+                final SPARQLParser parser = new SPARQLParser();
+                final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+
+                // Execute the query.
+                sailConn = sail.getConnection();
+                results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+                // Load the results into the PCJ table.
+                final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
+
+                while(results.hasNext()) {
+                    final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
+                    batch.add(result);
+
+                    if(batch.size() == 1000) {
+                        pcjStorage.addResults(pcjId, batch);
+                        batch.clear();
+                    }
+                }
+
+                if(!batch.isEmpty()) {
+                    pcjStorage.addResults(pcjId, batch);
+                    batch.clear();
+                }
+            } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
+                throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
+            }
+        } finally {
+            if(results != null) {
+                try {
+                    results.close();
+                } catch (final QueryEvaluationException e) {
+                    log.warn(e.getMessage(), e);
+                }
+            }
+
+            if(sailConn != null) {
+                try {
+                    sailConn.close();
+                } catch (final SailException e) {
+                    log.warn(e.getMessage(), e);
+                }
+            }
+
+            if(sail != null) {
+                try {
+                    sail.shutDown();
+                } catch (final SailException e) {
+                    log.warn(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    private Sail connectToRya(final String ryaInstanceName) throws RyaClientException {
+        try {
+            final AccumuloConnectionDetails connectionDetails = super.getAccumuloConnectionDetails();
+
+            final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+            ryaConf.setTablePrefix(ryaInstanceName);
+            ryaConf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername());
+            ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword()));
+            ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers());
+            ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName());
+
+            // Turn PCJs off so that we will only scan the core Rya tables while building the PCJ results.
+            ryaConf.set(ConfigUtils.USE_PCJ, "false");
+
+            return RyaSailFactory.getInstance(ryaConf);
+        } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+            throw new RyaClientException("Could not connect to the Rya instance named '" + ryaInstanceName + "'.", e);
+        }
+    }
+
+    private void updatePCJMetadata(final String ryaInstanceName, final String pcjId) throws RyaClientException {
+        // Update the PCJ's metadata to indicate it was just batch updated.
+        try {
+            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+
+            new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+                @Override
+                public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+                    // Update the original PCJ Details to indicate they were batch updated.
+                    final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+                    final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+                            .setUpdateStrategy( PCJUpdateStrategy.BATCH )
+                            .setLastUpdateTime( new Date());
+
+                    // Replace the old PCJ Details with the updated ones.
+                    final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+                    builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+                    return builder.build();
+                }
+            });
+        } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+            throw new RyaClientException("Could not update the PCJ's metadata.", e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 92b5d8c..30be548 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -92,13 +92,6 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
             throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
         }
 
-        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
-        final boolean usingFluo = fluoDetailsHolder.isPresent();
-        if(!usingFluo) {
-            throw new RyaClientException( String.format("Can not create a PCJ for the '%s' instance of Rya because it does" +
-                    "not have a Fluo application associated with it. Update the instance's PCJ Index Details to fix this problem.", instanceName) );
-        }
-
         // Create the PCJ table that will receive the index results.
         final String pcjId;
         final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName);
@@ -108,33 +101,36 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
             throw new RyaClientException("Problem while initializing the PCJ table.", e);
         }
 
-        // Task the Fluo application with updating the PCJ.
-        final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
-        try {
-            updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
-        } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) {
-            throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
-        }
-
-        // Update the Rya Details to indicate the PCJ is being updated incrementally.
-        final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
-        try {
-            new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
-                @Override
-                public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
-                    // Update the original PCJ Details to indicate they are incrementally updated.
-                    final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
-                    final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
-                        .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
-
-                    // Replace the old PCJ Details with the updated ones.
-                    final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
-                    builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
-                    return builder.build();
-                }
-            });
-        } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
-            throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+        // If a Fluo application is being used, task it with updating the PCJ.
+        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+        if(fluoDetailsHolder.isPresent()) {
+            final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+            try {
+                updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+            } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) {
+                throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+            }
+
+            // Update the Rya Details to indicate the PCJ is being updated incrementally.
+            final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
+            try {
+                new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+                    @Override
+                    public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+                        // Update the original PCJ Details to indicate they are incrementally updated.
+                        final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+                        final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+                            .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
+
+                        // Replace the old PCJ Details with the updated ones.
+                        final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+                        builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+                        return builder.build();
+                    }
+                });
+            } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+                throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+            }
         }
 
         // Return the ID that was assigned to the PCJ.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 8c276a8..102f667 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -52,6 +52,7 @@ public class AccumuloRyaClientFactory {
                 new AccumuloInstall(connectionDetails, connector),
                 new AccumuloCreatePCJ(connectionDetails, connector),
                 new AccumuloDeletePCJ(connectionDetails, connector),
+                new AccumuloBatchUpdatePCJ(connectionDetails, connector),
                 new AccumuloGetInstanceDetails(connectionDetails, connector),
                 new AccumuloInstanceExists(connectionDetails, connector),
                 new AccumuloListInstances(connectionDetails, connector));

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
index bf10c84..7c48315 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
@@ -27,6 +27,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
 
+import cern.colt.Arrays;
 import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
 import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
 
@@ -64,7 +65,7 @@ public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinS
         // Ensure the storage type has been set.
         final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
         checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
-                "' property must have one of the following values: " + PrecomputedJoinStorageType.values());
+                "' property must have one of the following values: " + Arrays.toString(PrecomputedJoinStorageType.values()));
 
         // Create and return the configured storage.
         switch(storageType.get()) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
new file mode 100644
index 0000000..f23f1c4
--- /dev/null
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -0,0 +1,135 @@
+package mvm.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+
+import mvm.rya.accumulo.AccumuloITBase;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.client.Install.InstallConfiguration;
+import mvm.rya.api.client.RyaClient;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Integration tests the methods of {@link AccumuloBatchUpdatePCJ}.
+ */
+public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
+
+    private static final String RYA_INSTANCE_NAME = "test_";
+
+    @Test
+    public void batchUpdate() throws Exception {
+        // Setup a Rya Client.
+        final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+                super.getUsername(),
+                super.getPassword().toCharArray(),
+                super.getInstanceName(),
+                super.getZookeepers());
+        final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+        // Install an instance of Rya on the mini accumulo cluster.
+        ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+                .setEnablePcjIndex(true)
+                .build());
+
+        Sail sail = null;
+        try {
+            // Get a Sail connection backed by the installed Rya instance.
+            final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+            ryaConf.setTablePrefix(RYA_INSTANCE_NAME);
+            ryaConf.set(ConfigUtils.CLOUDBASE_USER, super.getUsername());
+            ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, super.getPassword());
+            ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, super.getZookeepers());
+            ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, super.getInstanceName());
+            ryaConf.set(ConfigUtils.USE_PCJ, "true");
+            ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+            ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+            sail = RyaSailFactory.getInstance( ryaConf );
+
+            // Load some statements into the Rya instance.
+            final ValueFactory vf = sail.getValueFactory();
+
+            final SailConnection sailConn = sail.getConnection();
+            sailConn.begin();
+            sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+            sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+
+            sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+            sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+            sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+            sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+            sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+            sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+            sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:green"));
+            sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:brown"));
+            sailConn.commit();
+            sailConn.close();
+
+            // Create a PCJ for a SPARQL query.
+            final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME);
+            final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }";
+            final String pcjId = pcjStorage.createPcj(sparql);
+
+            // Run the test.
+            ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+
+            // Verify the correct results were loaded into the PCJ table.
+            final Set<BindingSet> expectedResults = new HashSet<>();
+
+            MapBindingSet bs = new MapBindingSet();
+            bs.addBinding("name", vf.createURI("urn:Alice"));
+            expectedResults.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("name", vf.createURI("urn:Bob"));
+            expectedResults.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("name", vf.createURI("urn:Charlie"));
+            expectedResults.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("name", vf.createURI("urn:David"));
+            expectedResults.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("name", vf.createURI("urn:Eve"));
+            expectedResults.add(bs);
+
+            bs = new MapBindingSet();
+            bs.addBinding("name", vf.createURI("urn:Frank"));
+            expectedResults.add(bs);
+
+            final Set<BindingSet> results = new HashSet<>();
+            for(final BindingSet result : pcjStorage.listResults(pcjId)) {
+                results.add( result );
+            }
+
+            assertEquals(expectedResults, results);
+
+        } finally {
+            if(sail != null) {
+                sail.shutDown();
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 9568e27..0ffeb7a 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -1,5 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
-
+<?xml version="1.0" encoding="UTF-8"?>
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
@@ -18,7 +17,6 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
-
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
@@ -44,5 +42,6 @@ under the License.
         <module>vagrantExample</module>
         <module>rya.pcj.fluo</module>
         <module>rya.merger</module>
+        <module>rya.benchmark</module>
     </modules>
-</project>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml
new file mode 100644
index 0000000..5b9eb68
--- /dev/null
+++ b/extras/rya.benchmark/pom.xml
@@ -0,0 +1,250 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" 
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+         
+    <parent>
+        <artifactId>rya.extras</artifactId>
+        <groupId>org.apache.rya</groupId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.benchmark</artifactId>
+
+    <name>Apache Rya Benchmarks</name>
+
+    <dependencies>
+        <!-- JMH Benchmark Framework dependencies -->
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-core</artifactId>
+            <version>${jmh.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.openjdk.jmh</groupId>
+            <artifactId>jmh-generator-annprocess</artifactId>
+            <version>${jmh.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>3.0.1</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-minicluster</artifactId>
+            <version>${accumulo.version}</version>
+            <scope>test</scope>
+        </dependency>
+        
+    </dependencies>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+        <!--
+            JMH version to use with this project.
+          -->
+        <jmh.version>1.13</jmh.version>
+
+        <!--
+            Java source/target to use for compilation.
+          -->
+        <javac.target>1.8</javac.target>
+
+        <!--
+            Name of the benchmark Uber-JAR to generate.
+          -->
+        <uberjar.name>benchmarks</uberjar.name>
+    </properties>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/main/xsd</directory>
+            </resource>
+        </resources>
+    
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <compilerVersion>${javac.target}</compilerVersion>
+                    <source>${javac.target}</source>
+                    <target>${javac.target}</target>
+                </configuration>
+            </plugin>
+            
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>jaxb2-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>xjc</id>
+                        <goals>
+                            <goal>xjc</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <!-- Place the generated source within the 'src' directory so license-maven-plugin will find it. -->
+                    <outputDirectory>src/main/gen</outputDirectory>
+                    <packageName>org.apache.rya.benchmark.query</packageName>
+                </configuration>
+            </plugin>
+            
+            <!-- Automatically place Apache 2 license headers at the top of all of the project's Java files.
+                 Rat runs during the 'validate' lifecycle step, so it will fail the build before this one 
+                 executes if any of the headers are missing. Run the build with rat turned off to add
+                 missing headers to the Java files. -->
+            <plugin>
+                <groupId>com.mycila</groupId>
+                <artifactId>license-maven-plugin</artifactId>
+                <version>2.6</version>
+                <configuration>
+                    <!-- We use a custome Apache 2.0 license because we do not include a copywrite section. -->                
+                    <header>src/main/resources/LICENSE.txt</header>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>process-sources</phase>
+                        <goals>
+                            <goal>format</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <finalName>${uberjar.name}</finalName>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>org.openjdk.jmh.Main</mainClass>
+                                </transformer>
+                            </transformers>
+                            <filters>
+                                <filter>
+                                    <!--
+                                        Shading signed JARs will fail without this.
+                                        http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+                                    -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>2.19.1</version>
+                <executions>
+                  <execution>
+                    <goals>
+                      <goal>integration-test</goal>
+                      <goal>verify</goal>
+                    </goals>
+                  </execution>
+                </executions>
+              </plugin>
+        </plugins>
+        <pluginManagement>
+            <plugins>
+                <plugin>
+                    <artifactId>maven-clean-plugin</artifactId>
+                    <version>2.5</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-deploy-plugin</artifactId>
+                    <version>2.8.1</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-install-plugin</artifactId>
+                    <version>2.5.1</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-jar-plugin</artifactId>
+                    <version>2.4</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-javadoc-plugin</artifactId>
+                    <version>2.9.1</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-resources-plugin</artifactId>
+                    <version>2.6</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-site-plugin</artifactId>
+                    <version>3.3</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-source-plugin</artifactId>
+                    <version>2.2.1</version>
+                </plugin>
+                <plugin>
+                    <artifactId>maven-surefire-plugin</artifactId>
+                    <version>2.17</version>
+                </plugin>
+            </plugins>
+        </pluginManagement>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
new file mode 100644
index 0000000..8cbf203
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
+import org.xml.sax.SAXException;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * Unmarshalls instances of {@link QueriesBenchmarkConf}.
+ */
+@ParametersAreNonnullByDefault
+public final class QueriesBenchmarkConfReader {
+
+    // It is assumed the schema file is held within the root directory of the packaged jar.
+    private static final String SCHEMA_LOCATION = "queries-benchmark-conf.xsd";
+
+    // Only load the Schema once.
+    private static final Supplier<Schema> SCHEMA_SUPPLIER = Suppliers.memoize(
+            new Supplier<Schema>() {
+                @Override
+                public Schema get() {
+                    final InputStream schemaStream = ClassLoader.getSystemResourceAsStream(SCHEMA_LOCATION);
+                    final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+                    try {
+                        return schemaFactory.newSchema( new StreamSource( schemaStream ) );
+                    } catch (final SAXException e) {
+                        throw new RuntimeException("Could not load the '" + SCHEMA_LOCATION + "' schema file. Make sure it is on the classpath.", e);
+                    }
+                }
+            });
+
+    /**
+     * Unmarshall an instance of {@link QueriesBenchmarkConf} from the XML that
+     * is retrieved from an {@link InputStream}.
+     *
+     * @param xmlStream - The input stream holding the XML. (not null)
+     * @return The {@link BenchmarkQueries} instance that was read from the stream.
+     * @throws JAXBException There was a problem with the formatting of the XML.
+     */
+    public QueriesBenchmarkConf load(final InputStream xmlStream) throws JAXBException {
+        requireNonNull(xmlStream);
+
+        // Load the schema that describes the stream.
+        final Schema schema = SCHEMA_SUPPLIER.get();
+
+        // Unmarshal the object from the stream.
+        final JAXBContext context = JAXBContext.newInstance( QueriesBenchmarkConf.class );
+        final Unmarshaller unmarshaller = context.createUnmarshaller();
+        unmarshaller.setSchema(schema);
+        return (QueriesBenchmarkConf) unmarshaller.unmarshal(xmlStream);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
new file mode 100644
index 0000000..404f183
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.rya.benchmark.query.Parameters.NumReadsRuns;
+import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException;
+import org.apache.rya.benchmark.query.Rya.Accumulo;
+import org.apache.rya.benchmark.query.Rya.SecondaryIndexing;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.CommandLineOptions;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * A benchmark that may be used to evaluate the performance of SPARQL queries
+ * over a living instance of Rya. It pivots over two dimensions:
+ * <ul>
+ *     <li>Which SPARQL query to execute</li>
+ *     <li>How many of the query's results to read</li>
+ * </ul>
+ * </p>
+ * These parameters are configured by placing a file named "queries-benchmark-conf.xml"
+ * within the directory the benchmark is being executed from. The schema that defines
+ * this XML file is named "queries-benchmark-conf.xsd" and may be found embedded within
+ * the benchmark's jar file.
+ * </p>
+ * To execute this benchmark, build the project by executing:
+ * <pre>
+ * mvn clean install
+ * </pre>
+ * Transport the "target/benchmarking.jar" file to the system that will execute
+ * the benchmark, write the configuration file, and then execute:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark
+ * </pre>
+ */
+@State(Scope.Thread)
+public class QueryBenchmark {
+
+    /**
+     * The path to the configuration file that this benchmark uses to connect to Rya.
+     */
+    public static final Path QUERY_BENCHMARK_CONFIGURATION_FILE = Paths.get("queries-benchmark-conf.xml");
+
+    /**
+     * Indicates all query results will be read during the benchmark.
+     */
+    public static final String READ_ALL = "ALL";
+
+    @Param({"1", "10", "100", READ_ALL})
+    public String numReads;
+
+    @Param({})
+    public String sparql;
+
+    private Sail sail = null;
+    private SailConnection sailConn = null;
+
+    @Setup
+    public void setup() throws Exception {
+        // Setup logging.
+        final ConsoleAppender console = new ConsoleAppender();
+        console.setLayout(new PatternLayout("%d [%p|%c|%C{1}] %m%n"));
+        console.setThreshold(Level.INFO);
+        console.activateOptions();
+        Logger.getRootLogger().addAppender(console);
+
+        // Load the benchmark's configuration file.
+        final InputStream queriesStream = Files.newInputStream(QUERY_BENCHMARK_CONFIGURATION_FILE);
+        final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream);
+
+        // Create the Rya Configuration object using the benchmark's configuration.
+        final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+
+        final Rya rya = benchmarkConf.getRya();
+        ryaConf.setTablePrefix(rya.getRyaInstanceName());
+
+        final Accumulo accumulo = rya.getAccumulo();
+        ryaConf.set(ConfigUtils.CLOUDBASE_USER, accumulo.getUsername());
+        ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, accumulo.getPassword());
+        ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, accumulo.getZookeepers());
+        ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName());
+
+        // Print the query plan so that you can visually inspect how PCJs are being applied for each benchmark.
+        ryaConf.set(ConfigUtils.DISPLAY_QUERY_PLAN, "true");
+
+        // Turn on PCJs if we are configured to use them.
+        final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing();
+        if(secondaryIndexing.isUsePCJ()) {
+            ryaConf.set(ConfigUtils.USE_PCJ, "true");
+            ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+            ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+        } else {
+            ryaConf.set(ConfigUtils.USE_PCJ, "false");
+        }
+
+        // Create the connections used to execute the benchmark.
+        sail = RyaSailFactory.getInstance( ryaConf );
+        sailConn = sail.getConnection();
+    }
+
+    @TearDown
+    public void tearDown() {
+        try {
+            sailConn.close();
+        } catch(final Exception e) { }
+
+        try {
+            sail.shutDown();
+        } catch(final Exception e) { }
+    }
+
+    @Benchmark
+    @BenchmarkMode(Mode.SingleShotTime)
+    @Timeout(time = 1, timeUnit = TimeUnit.HOURS)
+    public void queryRya() throws MalformedQueryException, QueryEvaluationException, SailException, NotEnoughResultsException {
+        final QueryBenchmarkRun benchmark;
+
+        if(numReads.equals( READ_ALL )) {
+            benchmark = new QueryBenchmarkRun(sailConn, sparql);
+        } else {
+            benchmark = new QueryBenchmarkRun(sailConn, sparql, Long.parseLong(numReads));
+        }
+
+        benchmark.run();
+    }
+
+    /**
+     * Runs the query benchmarks.
+     * </p>
+     * Example command line:
+     * <pre>
+     * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark
+     * </pre>
+     *
+     * @param args - The command line arguments that will be fed into the benchmark.
+     * @throws Exception The benchmark could not be run.
+     */
+    public static void main(final String[] args) throws Exception {
+        // Read the queries that will be benchmarked from the provided path.
+        final InputStream queriesStream = Files.newInputStream( QUERY_BENCHMARK_CONFIGURATION_FILE );
+        final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream);
+        final Parameters parameters = benchmarkConf.getParameters();
+
+        // Setup the options that will be used to run the benchmark.
+        final OptionsBuilder options = new OptionsBuilder();
+        options.parent( new CommandLineOptions(args) );
+        options.include(QueryBenchmark.class.getSimpleName());
+
+        // Provide the SPARQL queries that will be injected into the benchmark's 'sparql' parameter.
+        final List<String> sparql = parameters.getQueries().getSPARQL();
+        final String[] sparqlArray = new String[ sparql.size() ];
+        sparql.toArray( sparqlArray );
+
+        // Clean up the sparql's whitespace.
+        for(int i = 0; i < sparqlArray.length; i++) {
+            sparqlArray[i] = sparqlArray[i].trim();
+        }
+
+        options.param("sparql", sparqlArray);
+
+        // If numReadsRuns was specified, inject them into the benchmark's 'numReads' parameter.
+        final NumReadsRuns numReadsRuns = parameters.getNumReadsRuns();
+        if(numReadsRuns != null) {
+            // Validate the list.
+            final List<String> numReadsList = numReadsRuns.getNumReads();
+            for(final String numReads : numReadsList) {
+                // It may be the READ_ALL flag.
+                if(numReads.equals(READ_ALL)) {
+                    continue;
+                }
+
+                // Or it must be a Long.
+                try {
+                    Long.parseLong(numReads);
+                } catch(final NumberFormatException e) {
+                    throw new RuntimeException("There is a problem with the benchmark's configuration. Encountered " +
+                            "a numReads value of '" + numReads + "', which is inavlid. The value must be a Long or " +
+                            "'" + READ_ALL + "'");
+                }
+            }
+
+            // Configure the benchmark with the numRuns.
+            final String[] numReadsArray = new String[ numReadsList.size() ];
+            numReadsList.toArray( numReadsArray );
+            options.param("numReads", numReadsArray);
+        }
+
+        // Execute the benchmark.
+        new Runner(options.build()).run();
+    }
+
+    /**
+     * Executes an iteration of the benchmarked logic.
+     */
+    @ParametersAreNonnullByDefault
+    public static final class QueryBenchmarkRun {
+
+        private final SailConnection sailConn;
+        private final String sparql;
+        private final Optional<Long> numReads;
+
+        /**
+         * Constructs an instance of {@link QueryBenchmarkRun} that will read all of the results of the query.
+         *
+         * @param sailConn - The connection to the Rya instance the query will be executed against. (not null)
+         * @param sparql - The query that will be executed. (not null)
+         */
+        public QueryBenchmarkRun(final SailConnection sailConn, final String sparql) {
+            this.sailConn = requireNonNull(sailConn);
+            this.sparql = requireNonNull(sparql);
+            this.numReads = Optional.empty();
+        }
+
+        /**
+         * Constructs an instance of {@link QueryBenchmarkRun} that will only read a specific number of results.
+         *
+         * @param sailConn - The connection to the Rya instance the query will be executed against. (not null)
+         * @param sparql - The query that will be executed. (not null)
+         * @param numReads - The number of results that will be read. (not null)
+         */
+        public QueryBenchmarkRun(final SailConnection sailConn, final String sparql, final Long numReads) {
+            this.sailConn = requireNonNull(sailConn);
+            this.sparql = requireNonNull(sparql);
+            this.numReads = Optional.of( requireNonNull(numReads) );
+        }
+
+        public void run() throws MalformedQueryException, QueryEvaluationException, NotEnoughResultsException, SailException {
+            CloseableIteration<? extends BindingSet, QueryEvaluationException> it = null;
+
+            try {
+                // Execute the query.
+                final SPARQLParser sparqlParser = new SPARQLParser();
+                final ParsedQuery parsedQuery = sparqlParser.parseQuery(sparql, null);
+                it = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+                // Perform the reads.
+                if(numReads.isPresent()) {
+                    read(it, numReads.get() );
+                } else {
+                    readAll(it);
+                }
+            } finally {
+                if(it != null) {
+                    it.close();
+                }
+            }
+        }
+
+        private void read(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it, final long numReads) throws QueryEvaluationException, NotEnoughResultsException {
+            requireNonNull(it);
+            long i = 0;
+            while(i < numReads) {
+                if(!it.hasNext()) {
+                    throw new NotEnoughResultsException(String.format("The SPARQL query did not result in enough results. Needed: %d Found: %d", numReads, i));
+                }
+                it.next();
+                i++;
+            }
+        }
+
+        private  void readAll(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it) throws QueryEvaluationException {
+            requireNonNull(it);
+            while(it.hasNext()) {
+                it.next();
+            }
+        }
+
+        /**
+         * The benchmark must read a specific number of results, but the benchmarked query
+         * does not have enough results to meet that number.
+         */
+        public static final class NotEnoughResultsException extends Exception {
+            private static final long serialVersionUID = 1L;
+
+            public NotEnoughResultsException(final String message) {
+                super(message);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/resources/LICENSE.txt
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/resources/LICENSE.txt b/extras/rya.benchmark/src/main/resources/LICENSE.txt
new file mode 100644
index 0000000..4a9fe83
--- /dev/null
+++ b/extras/rya.benchmark/src/main/resources/LICENSE.txt
@@ -0,0 +1,16 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+ 
+    http://www.apache.org/licenses/LICENSE-2.0
+ 
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
new file mode 100644
index 0000000..826083e
--- /dev/null
+++ b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+        http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
+
+    <xsd:element name="QueriesBenchmarkConf">
+        <xsd:complexType>
+            <xsd:sequence>
+                <xsd:element name="Rya" type="Rya"/>
+                <xsd:element name="Parameters" type="Parameters"/>
+            </xsd:sequence>
+        </xsd:complexType>
+    </xsd:element>
+
+    <xsd:complexType name="Rya">
+        <xsd:sequence>
+            <xsd:element name="ryaInstanceName" type="xsd:string" />
+            <xsd:element name="accumulo">
+                <xsd:complexType>
+                    <xsd:sequence>
+                        <xsd:element name="username" type="xsd:string"/>
+                        <xsd:element name="password" type="xsd:string"/>
+                        <xsd:element name="zookeepers" type="xsd:string"/>
+                        <xsd:element name="instanceName" type="xsd:string"/>
+                    </xsd:sequence>
+                </xsd:complexType>
+            </xsd:element>
+            <xsd:element name="secondaryIndexing">
+                <xsd:complexType>
+                    <xsd:sequence>
+                        <xsd:element name="usePCJ" type="xsd:boolean"/>
+                    </xsd:sequence>
+                </xsd:complexType>
+            </xsd:element>
+        </xsd:sequence>
+    </xsd:complexType>
+
+    <xsd:complexType name="Parameters">
+        <xsd:sequence>
+            <xsd:element name="NumReadsRuns">
+                <xsd:complexType>
+                    <xsd:sequence>
+                        <xsd:element name="NumReads" type="xsd:string" maxOccurs="unbounded"/>
+                    </xsd:sequence>
+                </xsd:complexType>
+            </xsd:element>
+            <xsd:element name="Queries" minOccurs="0">
+                <xsd:complexType>
+                    <xsd:sequence>
+                        <xsd:element name="SPARQL" type="xsd:string" maxOccurs="unbounded"/>
+                    </xsd:sequence>
+                </xsd:complexType>
+            </xsd:element>
+        </xsd:sequence>
+    </xsd:complexType>
+</xsd:schema>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
new file mode 100644
index 0000000..f229dc4
--- /dev/null
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.rya.benchmark.query.Parameters.NumReadsRuns;
+import org.apache.rya.benchmark.query.Parameters.Queries;
+import org.apache.rya.benchmark.query.Rya.Accumulo;
+import org.apache.rya.benchmark.query.Rya.SecondaryIndexing;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+/**
+ * Tests the methods of {@link BenchmarkQueriesReader}.
+ */
+public class QueriesBenchmarkConfReaderIT {
+
+    @Test
+    public void load() throws JAXBException, SAXException {
+        // Unmarshal some XML.
+        final String xml =
+                "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+                "<QueriesBenchmarkConf>\n" +
+                "    <Rya>\n" +
+                "        <ryaInstanceName>test_</ryaInstanceName>\n" +
+                "        <accumulo>\n" +
+                "            <username>test</username>\n" +
+                "            <password>t3stP@ssw0rd</password>\n" +
+                "            <zookeepers>zoo-server-1,zoo-server-2</zookeepers>\n" +
+                "            <instanceName>testInstance</instanceName>\n" +
+                "        </accumulo>\n" +
+                "        <secondaryIndexing>\n" +
+                "            <usePCJ>true</usePCJ>\n" +
+                "        </secondaryIndexing>\n" +
+                "    </Rya>\n" +
+                "    <Parameters>" +
+                "        <NumReadsRuns>" +
+                "            <NumReads>1</NumReads>" +
+                "            <NumReads>10</NumReads>" +
+                "            <NumReads>100</NumReads>" +
+                "            <NumReads>ALL</NumReads>" +
+                "        </NumReadsRuns>" +
+                "        <Queries>\n" +
+                "            <SPARQL><![CDATA[SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }]]></SPARQL>\n" +
+                "            <SPARQL><![CDATA[SELECT ?a ?b WHERE { ?a <http://knows> ?b . }]]></SPARQL>\n" +
+                "        </Queries>\n" +
+                "    </Parameters>" +
+                "</QueriesBenchmarkConf>";
+
+        final InputStream xmlStream = new ByteArrayInputStream( xml.getBytes(Charsets.UTF_8) );
+        final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load( xmlStream );
+
+        // Ensure it was unmarshalled correctly.
+        final Rya rya = benchmarkConf.getRya();
+        assertEquals("test_", rya.getRyaInstanceName());
+
+        final Accumulo accumulo = rya.getAccumulo();
+        assertEquals("test", accumulo.getUsername());
+        assertEquals("t3stP@ssw0rd", accumulo.getPassword());
+        assertEquals("zoo-server-1,zoo-server-2", accumulo.getZookeepers());
+        assertEquals("testInstance", accumulo.getInstanceName());
+
+        final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing();
+        assertTrue(secondaryIndexing.isUsePCJ());
+
+
+        final Parameters parameters = benchmarkConf.getParameters();
+        final List<String> expectedNumReads = Lists.newArrayList("1", "10", "100", "ALL");
+        final NumReadsRuns NumReads = parameters.getNumReadsRuns();
+        assertEquals(expectedNumReads, NumReads.getNumReads());
+
+        final List<String> expectedQueries = Lists.newArrayList(
+                "SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }",
+                "SELECT ?a ?b WHERE { ?a <http://knows> ?b . }");
+        final Queries queries = parameters.getQueries();
+        assertEquals(expectedQueries, queries.getSPARQL());
+    }
+}
\ No newline at end of file