You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/09/27 15:11:10 UTC
[2/5] incubator-rya git commit: RYA-151 Rya Query benchmark tool
implemented using JMH.
RYA-151 Rya Query benchmark tool implemented using JMH.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e77e839d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e77e839d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e77e839d
Branch: refs/heads/master
Commit: e77e839d76e140bee47eb610cf19c453ef0a79b5
Parents: 62c0794
Author: Kevin Chilton <ke...@parsons.com>
Authored: Wed Aug 17 22:29:41 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Sep 27 11:05:06 2016 -0400
----------------------------------------------------------------------
.../java/mvm/rya/api/client/BatchUpdatePCJ.java | 22 ++
.../api/client/PCJDoesNotExistException.java | 20 ++
.../main/java/mvm/rya/api/client/RyaClient.java | 13 +-
.../api/instance/RyaDetailsToConfiguration.java | 72 ++--
.../java/mvm/rya/accumulo/AccumuloITBase.java | 8 +-
.../client/accumulo/AccumuloBatchUpdatePCJ.java | 226 +++++++++++++
.../api/client/accumulo/AccumuloCreatePCJ.java | 64 ++--
.../accumulo/AccumuloRyaClientFactory.java | 1 +
.../PrecomputedJoinStorageSupplier.java | 3 +-
.../accumulo/AccumuloBatchUpdatePCJIT.java | 135 ++++++++
extras/pom.xml | 7 +-
extras/rya.benchmark/pom.xml | 250 ++++++++++++++
.../query/QueriesBenchmarkConfReader.java | 83 +++++
.../rya/benchmark/query/QueryBenchmark.java | 336 +++++++++++++++++++
.../src/main/resources/LICENSE.txt | 16 +
.../src/main/xsd/queries-benchmark-conf.xsd | 74 ++++
.../query/QueriesBenchmarkConfReaderIT.java | 105 ++++++
.../benchmark/query/QueryBenchmarkRunIT.java | 196 +++++++++++
extras/rya.console/pom.xml | 17 +-
.../java/mvm/rya/shell/RyaAdminCommands.java | 72 ++--
.../java/mvm/rya/shell/util/InstallPrompt.java | 16 +-
.../mvm/rya/shell/RyaAdminCommandsTest.java | 37 +-
22 files changed, 1672 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
new file mode 100644
index 0000000..20d90e0
--- /dev/null
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
@@ -0,0 +1,22 @@
+package mvm.rya.api.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * Batch update a PCJ index.
+ */
+@ParametersAreNonnullByDefault
+public interface BatchUpdatePCJ {
+
+ /**
+ * Batch update a specific PCJ index using the {@link Statement}s that are
+ * currently in the Rya instance.
+ *
+ * @param ryaInstanceName - The Rya instance whose PCJ will be updated. (not null)
+ * @param pcjId - Identifies the PCJ index to update. (not null)
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws PCJDoesNotExistException No PCJ exists for the provided PCJ ID.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public void batchUpdate(String ryaInstanceName, String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
new file mode 100644
index 0000000..63efe0c
--- /dev/null
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
@@ -0,0 +1,20 @@
+package mvm.rya.api.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * One of the {@link RyaClient} commands could not execute because the connected
+ * instance of Rya does not have a PCJ matching the provided PCJ ID.
+ */
+@ParametersAreNonnullByDefault
+public class PCJDoesNotExistException extends RyaClientException {
+ private static final long serialVersionUID = 1L;
+
+ public PCJDoesNotExistException(final String message) {
+ super(message);
+ }
+
+ public PCJDoesNotExistException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
index 173e1e0..851a273 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
@@ -33,23 +33,26 @@ public class RyaClient {
private final Install install;
private final CreatePCJ createPcj;
private final DeletePCJ deletePcj;
+ private final BatchUpdatePCJ batchUpdatePcj;
private final GetInstanceDetails getInstanceDetails;
private final InstanceExists instanceExists;
private final ListInstances listInstances;
/**
- * Constructs an isntance of {@link RyaClient}.
+ * Constructs an instance of {@link RyaClient}.
*/
public RyaClient(
final Install install,
final CreatePCJ createPcj,
final DeletePCJ deletePcj,
+ final BatchUpdatePCJ batchUpdatePcj,
final GetInstanceDetails getInstanceDetails,
final InstanceExists instanceExists,
final ListInstances listInstances) {
this.install = requireNonNull(install);
this.createPcj = requireNonNull(createPcj);
this.deletePcj = requireNonNull(deletePcj);
+ this.batchUpdatePcj = requireNonNull(batchUpdatePcj);
this.getInstanceDetails = requireNonNull(getInstanceDetails);
this.instanceExists = requireNonNull(instanceExists);
this.listInstances = requireNonNull(listInstances);
@@ -79,6 +82,14 @@ public class RyaClient {
}
/**
+ * @return An instnace of {@link BatchUpdatePCJ} that is connected to a Rya storage
+ * if the Rya instance supports PCJ indexing.
+ */
+ public BatchUpdatePCJ getBatchUpdatePCJ() {
+ return batchUpdatePcj;
+ }
+
+ /**
* @return An instance of {@link GetInstanceDetails} that is connected to a Rya storage.
*/
public GetInstanceDetails getGetInstanceDetails() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
index faec0ff..8734adc 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
@@ -18,29 +18,34 @@
*/
package mvm.rya.api.instance;
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
/**
* Used to fetch {@link RyaDetails} from a {@link RyaDetailsRepository} and
* add them to the application's {@link Configuration}.
*/
+@ParametersAreNonnullByDefault
public class RyaDetailsToConfiguration {
- private static final Logger LOG = Logger.getLogger(RyaDetailsToConfiguration.class);
+ private static final Logger log = Logger.getLogger(RyaDetailsToConfiguration.class);
+
/**
* Ensures the values in the {@link Configuration} do not conflict with the values in {@link RyaDetails}.
* If they do, the values in {@link RyaDetails} take precedent and the {@link Configuration} value will
* be overwritten.
*
- * @param details - The {@link RyaDetails} to add to the {@link Configuration}.
- * @param conf - The {@link Configuration} to add {@link RyaDetails} to.
+ * @param details - The {@link RyaDetails} to add to the {@link Configuration}. (not null)
+ * @param conf - The {@link Configuration} to add {@link RyaDetails} to. (not null)
*/
public static void addRyaDetailsToConfiguration(final RyaDetails details, final Configuration conf) {
- Preconditions.checkNotNull(details);
- Preconditions.checkNotNull(conf);
+ requireNonNull(details);
+ requireNonNull(conf);
checkAndSet(conf, ConfigurationFields.USE_ENTITY, details.getEntityCentricIndexDetails().isEnabled());
checkAndSet(conf, ConfigurationFields.USE_FREETEXT, details.getFreeTextIndexDetails().isEnabled());
@@ -50,23 +55,44 @@ public class RyaDetailsToConfiguration {
}
/**
- * Checks to see if the configuration has a value in the specified field.
- * If the value exists and does not match what is expected by the {@link RyaDetails},
- * an error will be logged and the value will be overwritten.
- * @param conf - The {@link Configuration} to potentially change.
- * @param field - The field to check and set.
- * @param value - The new value in the field (is not used if the value doesn't need to be changed).
+ * Ensures a Rya Client will not try to use a secondary index that is not not supported by the Rya Instance
+ * it is connecting to.
+ * </p>
+ * If the configuration...
+ * <ul>
+ * <li>provides an 'on' value for an index that is supported, then nothing changes.</li>
+ * <li>provides an 'off' value for an index that is or is not supported, then nothing changes.</li>
+ * <li>provides an 'on' value for an index that is not supported, then the index is turned
+ * off and a warning is logged.</li>
+ * <li>does not provide any value for an index, then it will be turned on if supported.</li>
+ * </ul>
+ *
+ * @param conf - The {@link Configuration} to potentially change. (not null)
+ * @param useIndexField - The field within {@code conf} that indicates if the client will utilize the index. (not null)
+ * @param indexSupported - {@code true} if the Rya Instance supports the index; otherwise {@code false}.
*/
- private static void checkAndSet(final Configuration conf, final String field, final boolean value) {
- final Optional<String> opt = Optional.fromNullable(conf.get(field));
- if(opt.isPresent()) {
- final Boolean curVal = new Boolean(opt.get());
- if(curVal != value) {
- LOG.error("The user configured value in: " + field + " will be overwritten by what has been configured by the admin.");
- conf.setBoolean(field, value);
- }
- } else {
- conf.setBoolean(field, value);
+ private static void checkAndSet(final Configuration conf, final String useIndexField, final boolean indexSupported) {
+ requireNonNull(conf);
+ requireNonNull(useIndexField);
+
+ final Optional<String> useIndexStr = Optional.fromNullable( conf.get(useIndexField) );
+
+ // If no value was provided, default to using the index if it is supported.
+ if(!useIndexStr.isPresent()) {
+ log.info("No Rya Client configuration was provided for the " + useIndexField +
+ " index, so it is being defaulted to " + indexSupported);
+ conf.setBoolean(useIndexField, indexSupported);
+ return;
+ }
+
+ // If configured to use the index, but the Rya Instance does not support it, then turn it off.
+ final boolean useIndex = Boolean.parseBoolean( useIndexStr.get() );
+ if(useIndex && !indexSupported) {
+ log.warn("The Rya Client indicates it wants to use a secondary index that the Rya Instance does not support. " +
+ "This is not allowed, so the index will be turned off. Index Configuration Field: " + useIndexField);
+ conf.setBoolean(useIndexField, false);
}
+
+ // Otherwise use whatever the Client wants to use.
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
index 2a1c384..7dd23e6 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
@@ -95,11 +95,9 @@ public class AccumuloITBase {
}
/**
- * TODO doc
- *
- * @return
- * @throws AccumuloSecurityException
- * @throws AccumuloException
+ * @return A {@link Connector} that creates connections to the mini accumulo cluster.
+ * @throws AccumuloException Could not connect to the cluster.
+ * @throws AccumuloSecurityException Could not connect to the cluster because of a security violation.
*/
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
return cluster.getConnector();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
new file mode 100644
index 0000000..ee773b0
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -0,0 +1,226 @@
+package mvm.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import mvm.rya.api.client.BatchUpdatePCJ;
+import mvm.rya.api.client.InstanceDoesNotExistException;
+import mvm.rya.api.client.PCJDoesNotExistException;
+import mvm.rya.api.client.RyaClientException;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import mvm.rya.api.instance.RyaDetailsUpdater;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Uses an in memory Rya Client to batch update a PCJ index.
+ */
+public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpdatePCJ {
+
+ private static final Logger log = Logger.getLogger(AccumuloBatchUpdatePCJ.class);
+
+ /**
+ * Constructs an instance of {@link AccumuloBatchUpdatePCJ}.
+ *
+ * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+ * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+ */
+ public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+ super(connectionDetails, connector);
+ }
+
+ @Override
+ public void batchUpdate(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+ requireNonNull(ryaInstanceName);
+ requireNonNull(pcjId);
+ verifyPCJState(ryaInstanceName, pcjId);
+ updatePCJResults(ryaInstanceName, pcjId);
+ updatePCJMetadata(ryaInstanceName, pcjId);
+ }
+
+ private void verifyPCJState(final String ryaInstanceName, final String pcjId) throws RyaClientException {
+ try {
+ // Fetch the Rya instance's details.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+ final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails();
+
+ // Ensure PCJs are enabled.
+ if(!ryaDetails.getPCJIndexDetails().isEnabled()) {
+ throw new RyaClientException("PCJs are not enabled for the Rya instance named '" + ryaInstanceName + "'.");
+ }
+
+ // Ensure the PCJ exists.
+ if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) {
+ throw new PCJDoesNotExistException("The PCJ with id '" + pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'.");
+ }
+
+ // Ensure the PCJ is not already being incrementally updated.
+ final PCJDetails pcjDetails = ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final Optional<PCJUpdateStrategy> updateStrategy = pcjDetails.getUpdateStrategy();
+ if(updateStrategy.isPresent() && updateStrategy.get() == PCJUpdateStrategy.INCREMENTAL) {
+ throw new RyaClientException("The PCJ with id '" + pcjId + "' is already being updated incrementally.");
+ }
+ } catch(final NotInitializedException e) {
+ throw new InstanceDoesNotExistException("No RyaDetails are initialized for the Rya instance named '" + ryaInstanceName + "'.", e);
+ } catch (final RyaDetailsRepositoryException e) {
+ throw new RyaClientException("Could not fetch the RyaDetails for the Rya instance named '" + ryaInstanceName + "'.", e);
+ }
+ }
+
+ private void updatePCJResults(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+ // Things that have to be closed before we exit.
+ Sail sail = null;
+ SailConnection sailConn = null;
+ CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null;
+
+ try {
+ // Create an instance of Sail backed by the Rya instance.
+ sail = connectToRya(ryaInstanceName);
+
+ // Purge the old results from the PCJ.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName);
+ try {
+ pcjStorage.purge(pcjId);
+ } catch (final PCJStorageException e) {
+ throw new RyaClientException("Could not batch update PCJ with ID '" + pcjId + "' because the old " +
+ "results could not be purged from it.", e);
+ }
+
+ try {
+ // Parse the PCJ's SPARQL query.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = metadata.getSparql();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+
+ // Execute the query.
+ sailConn = sail.getConnection();
+ results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+ // Load the results into the PCJ table.
+ final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
+
+ while(results.hasNext()) {
+ final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
+ batch.add(result);
+
+ if(batch.size() == 1000) {
+ pcjStorage.addResults(pcjId, batch);
+ batch.clear();
+ }
+ }
+
+ if(!batch.isEmpty()) {
+ pcjStorage.addResults(pcjId, batch);
+ batch.clear();
+ }
+ } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
+ throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
+ }
+ } finally {
+ if(results != null) {
+ try {
+ results.close();
+ } catch (final QueryEvaluationException e) {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ if(sailConn != null) {
+ try {
+ sailConn.close();
+ } catch (final SailException e) {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ if(sail != null) {
+ try {
+ sail.shutDown();
+ } catch (final SailException e) {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private Sail connectToRya(final String ryaInstanceName) throws RyaClientException {
+ try {
+ final AccumuloConnectionDetails connectionDetails = super.getAccumuloConnectionDetails();
+
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+ ryaConf.setTablePrefix(ryaInstanceName);
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername());
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword()));
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName());
+
+ // Turn PCJs off so that we will only scan the core Rya tables while building the PCJ results.
+ ryaConf.set(ConfigUtils.USE_PCJ, "false");
+
+ return RyaSailFactory.getInstance(ryaConf);
+ } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+ throw new RyaClientException("Could not connect to the Rya instance named '" + ryaInstanceName + "'.", e);
+ }
+ }
+
+ private void updatePCJMetadata(final String ryaInstanceName, final String pcjId) throws RyaClientException {
+ // Update the PCJ's metadata to indicate it was just batch updated.
+ try {
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+
+ new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+ @Override
+ public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+ // Update the original PCJ Details to indicate they were batch updated.
+ final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+ .setUpdateStrategy( PCJUpdateStrategy.BATCH )
+ .setLastUpdateTime( new Date());
+
+ // Replace the old PCJ Details with the updated ones.
+ final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+ builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+ return builder.build();
+ }
+ });
+ } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new RyaClientException("Could not update the PCJ's metadata.", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 92b5d8c..30be548 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -92,13 +92,6 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
}
- final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
- final boolean usingFluo = fluoDetailsHolder.isPresent();
- if(!usingFluo) {
- throw new RyaClientException( String.format("Can not create a PCJ for the '%s' instance of Rya because it does" +
- "not have a Fluo application associated with it. Update the instance's PCJ Index Details to fix this problem.", instanceName) );
- }
-
// Create the PCJ table that will receive the index results.
final String pcjId;
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName);
@@ -108,33 +101,36 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
throw new RyaClientException("Problem while initializing the PCJ table.", e);
}
- // Task the Fluo application with updating the PCJ.
- final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
- try {
- updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
- } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) {
- throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
- }
-
- // Update the Rya Details to indicate the PCJ is being updated incrementally.
- final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
- try {
- new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
- @Override
- public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
- // Update the original PCJ Details to indicate they are incrementally updated.
- final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
- final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
- .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
-
- // Replace the old PCJ Details with the updated ones.
- final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
- builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
- return builder.build();
- }
- });
- } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
- throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+ // If a Fluo application is being used, task it with updating the PCJ.
+ final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+ if(fluoDetailsHolder.isPresent()) {
+ final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+ try {
+ updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+ } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) {
+ throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+ }
+
+ // Update the Rya Details to indicate the PCJ is being updated incrementally.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
+ try {
+ new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+ @Override
+ public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+ // Update the original PCJ Details to indicate they are incrementally updated.
+ final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+ .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
+
+ // Replace the old PCJ Details with the updated ones.
+ final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+ builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+ return builder.build();
+ }
+ });
+ } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+ }
}
// Return the ID that was assigned to the PCJ.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 8c276a8..102f667 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -52,6 +52,7 @@ public class AccumuloRyaClientFactory {
new AccumuloInstall(connectionDetails, connector),
new AccumuloCreatePCJ(connectionDetails, connector),
new AccumuloDeletePCJ(connectionDetails, connector),
+ new AccumuloBatchUpdatePCJ(connectionDetails, connector),
new AccumuloGetInstanceDetails(connectionDetails, connector),
new AccumuloInstanceExists(connectionDetails, connector),
new AccumuloListInstances(connectionDetails, connector));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
index bf10c84..7c48315 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
@@ -27,6 +27,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
+import cern.colt.Arrays;
import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
@@ -64,7 +65,7 @@ public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinS
// Ensure the storage type has been set.
final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
- "' property must have one of the following values: " + PrecomputedJoinStorageType.values());
+ "' property must have one of the following values: " + Arrays.toString(PrecomputedJoinStorageType.values()));
// Create and return the configured storage.
switch(storageType.get()) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
new file mode 100644
index 0000000..f23f1c4
--- /dev/null
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -0,0 +1,135 @@
+package mvm.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+
+import mvm.rya.accumulo.AccumuloITBase;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.client.Install.InstallConfiguration;
+import mvm.rya.api.client.RyaClient;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Integration tests the methods of {@link AccumuloBatchUpdatePCJ}.
+ */
+public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
+
+ private static final String RYA_INSTANCE_NAME = "test_";
+
+ @Test
+ public void batchUpdate() throws Exception {
+ // Setup a Rya Client.
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ super.getUsername(),
+ super.getPassword().toCharArray(),
+ super.getInstanceName(),
+ super.getZookeepers());
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+ // Install an instance of Rya on the mini accumulo cluster.
+ ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+ .setEnablePcjIndex(true)
+ .build());
+
+ Sail sail = null;
+ try {
+ // Get a Sail connection backed by the installed Rya instance.
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+ ryaConf.setTablePrefix(RYA_INSTANCE_NAME);
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, super.getUsername());
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, super.getPassword());
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, super.getZookeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, super.getInstanceName());
+ ryaConf.set(ConfigUtils.USE_PCJ, "true");
+ ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+ ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+ sail = RyaSailFactory.getInstance( ryaConf );
+
+ // Load some statements into the Rya instance.
+ final ValueFactory vf = sail.getValueFactory();
+
+ final SailConnection sailConn = sail.getConnection();
+ sailConn.begin();
+ sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+
+ sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:green"));
+ sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:brown"));
+ sailConn.commit();
+ sailConn.close();
+
+ // Create a PCJ for a SPARQL query.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME);
+ final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Run the test.
+ ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+
+ // Verify the correct results were loaded into the PCJ table.
+ final Set<BindingSet> expectedResults = new HashSet<>();
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Alice"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Bob"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Charlie"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:David"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Eve"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Frank"));
+ expectedResults.add(bs);
+
+ final Set<BindingSet> results = new HashSet<>();
+ for(final BindingSet result : pcjStorage.listResults(pcjId)) {
+ results.add( result );
+ }
+
+ assertEquals(expectedResults, results);
+
+ } finally {
+ if(sail != null) {
+ sail.shutDown();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 9568e27..0ffeb7a 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -1,5 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
-
+<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -18,7 +17,6 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -44,5 +42,6 @@ under the License.
<module>vagrantExample</module>
<module>rya.pcj.fluo</module>
<module>rya.merger</module>
+ <module>rya.benchmark</module>
</modules>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml
new file mode 100644
index 0000000..5b9eb68
--- /dev/null
+++ b/extras/rya.benchmark/pom.xml
@@ -0,0 +1,250 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>rya.extras</artifactId>
+ <groupId>org.apache.rya</groupId>
+ <version>3.2.10-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.benchmark</artifactId>
+
+ <name>Apache Rya Benchmarks</name>
+
+ <dependencies>
+ <!-- JMH Benchmark Framework dependencies -->
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-minicluster</artifactId>
+ <version>${accumulo.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <!--
+ JMH version to use with this project.
+ -->
+ <jmh.version>1.13</jmh.version>
+
+ <!--
+ Java source/target to use for compilation.
+ -->
+ <javac.target>1.8</javac.target>
+
+ <!--
+ Name of the benchmark Uber-JAR to generate.
+ -->
+ <uberjar.name>benchmarks</uberjar.name>
+ </properties>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/xsd</directory>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerVersion>${javac.target}</compilerVersion>
+ <source>${javac.target}</source>
+ <target>${javac.target}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>jaxb2-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>xjc</id>
+ <goals>
+ <goal>xjc</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <!-- Place the generated source within the 'src' directory so license-maven-plugin will find it. -->
+ <outputDirectory>src/main/gen</outputDirectory>
+ <packageName>org.apache.rya.benchmark.query</packageName>
+ </configuration>
+ </plugin>
+
+ <!-- Automatically place Apache 2 license headers at the top of all of the project's Java files.
+ Rat runs during the 'validate' lifecycle step, so it will fail the build before this one
+ executes if any of the headers are missing. Run the build with rat turned off to add
+ missing headers to the Java files. -->
+ <plugin>
+ <groupId>com.mycila</groupId>
+ <artifactId>license-maven-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <!-- We use a custome Apache 2.0 license because we do not include a copywrite section. -->
+ <header>src/main/resources/LICENSE.txt</header>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>format</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${uberjar.name}</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.openjdk.jmh.Main</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <!--
+ Shading signed JARs will fail without this.
+ http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+ -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-install-plugin</artifactId>
+ <version>2.5.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.3</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.17</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
new file mode 100644
index 0000000..8cbf203
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
+import org.xml.sax.SAXException;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * Unmarshalls instances of {@link QueriesBenchmarkConf}.
+ */
+@ParametersAreNonnullByDefault
+public final class QueriesBenchmarkConfReader {
+
+ // It is assumed the schema file is held within the root directory of the packaged jar.
+ private static final String SCHEMA_LOCATION = "queries-benchmark-conf.xsd";
+
+ // Only load the Schema once.
+ private static final Supplier<Schema> SCHEMA_SUPPLIER = Suppliers.memoize(
+ new Supplier<Schema>() {
+ @Override
+ public Schema get() {
+ final InputStream schemaStream = ClassLoader.getSystemResourceAsStream(SCHEMA_LOCATION);
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ try {
+ return schemaFactory.newSchema( new StreamSource( schemaStream ) );
+ } catch (final SAXException e) {
+ throw new RuntimeException("Could not load the '" + SCHEMA_LOCATION + "' schema file. Make sure it is on the classpath.", e);
+ }
+ }
+ });
+
+ /**
+ * Unmarshall an instance of {@link QueriesBenchmarkConf} from the XML that
+ * is retrieved from an {@link InputStream}.
+ *
+ * @param xmlStream - The input stream holding the XML. (not null)
+ * @return The {@link BenchmarkQueries} instance that was read from the stream.
+ * @throws JAXBException There was a problem with the formatting of the XML.
+ */
+ public QueriesBenchmarkConf load(final InputStream xmlStream) throws JAXBException {
+ requireNonNull(xmlStream);
+
+ // Load the schema that describes the stream.
+ final Schema schema = SCHEMA_SUPPLIER.get();
+
+ // Unmarshal the object from the stream.
+ final JAXBContext context = JAXBContext.newInstance( QueriesBenchmarkConf.class );
+ final Unmarshaller unmarshaller = context.createUnmarshaller();
+ unmarshaller.setSchema(schema);
+ return (QueriesBenchmarkConf) unmarshaller.unmarshal(xmlStream);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
new file mode 100644
index 0000000..404f183
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.rya.benchmark.query.Parameters.NumReadsRuns;
+import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException;
+import org.apache.rya.benchmark.query.Rya.Accumulo;
+import org.apache.rya.benchmark.query.Rya.SecondaryIndexing;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.CommandLineOptions;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * A benchmark that may be used to evaluate the performance of SPARQL queries
+ * over a living instance of Rya. It pivots over two dimensions:
+ * <ul>
+ * <li>Which SPARQL query to execute</li>
+ * <li>How many of the query's results to read</li>
+ * </ul>
+ * </p>
+ * These parameters are configured by placing a file named "queries-benchmark-conf.xml"
+ * within the directory the benchmark is being executed from. The schema that defines
+ * this XML file is named "queries-benchmark-conf.xsd" and may be found embedded within
+ * the benchmark's jar file.
+ * </p>
+ * To execute this benchmark, build the project by executing:
+ * <pre>
+ * mvn clean install
+ * </pre>
+ * Transport the "target/benchmarking.jar" file to the system that will execute
+ * the benchmark, write the configuration file, and then execute:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark
+ * </pre>
+ */
+@State(Scope.Thread)
+public class QueryBenchmark {
+
+ /**
+ * The path to the configuration file that this benchmark uses to connect to Rya.
+ */
+ public static final Path QUERY_BENCHMARK_CONFIGURATION_FILE = Paths.get("queries-benchmark-conf.xml");
+
+ /**
+ * Indicates all query results will be read during the benchmark.
+ */
+ public static final String READ_ALL = "ALL";
+
+ @Param({"1", "10", "100", READ_ALL})
+ public String numReads;
+
+ @Param({})
+ public String sparql;
+
+ private Sail sail = null;
+ private SailConnection sailConn = null;
+
+ @Setup
+ public void setup() throws Exception {
+ // Setup logging.
+ final ConsoleAppender console = new ConsoleAppender();
+ console.setLayout(new PatternLayout("%d [%p|%c|%C{1}] %m%n"));
+ console.setThreshold(Level.INFO);
+ console.activateOptions();
+ Logger.getRootLogger().addAppender(console);
+
+ // Load the benchmark's configuration file.
+ final InputStream queriesStream = Files.newInputStream(QUERY_BENCHMARK_CONFIGURATION_FILE);
+ final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream);
+
+ // Create the Rya Configuration object using the benchmark's configuration.
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+
+ final Rya rya = benchmarkConf.getRya();
+ ryaConf.setTablePrefix(rya.getRyaInstanceName());
+
+ final Accumulo accumulo = rya.getAccumulo();
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, accumulo.getUsername());
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, accumulo.getPassword());
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, accumulo.getZookeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName());
+
+ // Print the query plan so that you can visually inspect how PCJs are being applied for each benchmark.
+ ryaConf.set(ConfigUtils.DISPLAY_QUERY_PLAN, "true");
+
+ // Turn on PCJs if we are configured to use them.
+ final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing();
+ if(secondaryIndexing.isUsePCJ()) {
+ ryaConf.set(ConfigUtils.USE_PCJ, "true");
+ ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+ ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+ } else {
+ ryaConf.set(ConfigUtils.USE_PCJ, "false");
+ }
+
+ // Create the connections used to execute the benchmark.
+ sail = RyaSailFactory.getInstance( ryaConf );
+ sailConn = sail.getConnection();
+ }
+
+ @TearDown
+ public void tearDown() {
+ try {
+ sailConn.close();
+ } catch(final Exception e) { }
+
+ try {
+ sail.shutDown();
+ } catch(final Exception e) { }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
+ @Timeout(time = 1, timeUnit = TimeUnit.HOURS)
+ public void queryRya() throws MalformedQueryException, QueryEvaluationException, SailException, NotEnoughResultsException {
+ final QueryBenchmarkRun benchmark;
+
+ if(numReads.equals( READ_ALL )) {
+ benchmark = new QueryBenchmarkRun(sailConn, sparql);
+ } else {
+ benchmark = new QueryBenchmarkRun(sailConn, sparql, Long.parseLong(numReads));
+ }
+
+ benchmark.run();
+ }
+
+ /**
+ * Runs the query benchmarks.
+ * </p>
+ * Example command line:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark
+ * </pre>
+ *
+ * @param args - The command line arguments that will be fed into the benchmark.
+ * @throws Exception The benchmark could not be run.
+ */
+ public static void main(final String[] args) throws Exception {
+ // Read the queries that will be benchmarked from the provided path.
+ final InputStream queriesStream = Files.newInputStream( QUERY_BENCHMARK_CONFIGURATION_FILE );
+ final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream);
+ final Parameters parameters = benchmarkConf.getParameters();
+
+ // Setup the options that will be used to run the benchmark.
+ final OptionsBuilder options = new OptionsBuilder();
+ options.parent( new CommandLineOptions(args) );
+ options.include(QueryBenchmark.class.getSimpleName());
+
+ // Provide the SPARQL queries that will be injected into the benchmark's 'sparql' parameter.
+ final List<String> sparql = parameters.getQueries().getSPARQL();
+ final String[] sparqlArray = new String[ sparql.size() ];
+ sparql.toArray( sparqlArray );
+
+ // Clean up the sparql's whitespace.
+ for(int i = 0; i < sparqlArray.length; i++) {
+ sparqlArray[i] = sparqlArray[i].trim();
+ }
+
+ options.param("sparql", sparqlArray);
+
+ // If numReadsRuns was specified, inject them into the benchmark's 'numReads' parameter.
+ final NumReadsRuns numReadsRuns = parameters.getNumReadsRuns();
+ if(numReadsRuns != null) {
+ // Validate the list.
+ final List<String> numReadsList = numReadsRuns.getNumReads();
+ for(final String numReads : numReadsList) {
+ // It may be the READ_ALL flag.
+ if(numReads.equals(READ_ALL)) {
+ continue;
+ }
+
+ // Or it must be a Long.
+ try {
+ Long.parseLong(numReads);
+ } catch(final NumberFormatException e) {
+ throw new RuntimeException("There is a problem with the benchmark's configuration. Encountered " +
+ "a numReads value of '" + numReads + "', which is inavlid. The value must be a Long or " +
+ "'" + READ_ALL + "'");
+ }
+ }
+
+ // Configure the benchmark with the numRuns.
+ final String[] numReadsArray = new String[ numReadsList.size() ];
+ numReadsList.toArray( numReadsArray );
+ options.param("numReads", numReadsArray);
+ }
+
+ // Execute the benchmark.
+ new Runner(options.build()).run();
+ }
+
+ /**
+ * Executes an iteration of the benchmarked logic.
+ */
+ @ParametersAreNonnullByDefault
+ public static final class QueryBenchmarkRun {
+
+ private final SailConnection sailConn;
+ private final String sparql;
+ private final Optional<Long> numReads;
+
+ /**
+ * Constructs an instance of {@link QueryBenchmarkRun} that will read all of the results of the query.
+ *
+ * @param sailConn - The connection to the Rya instance the query will be executed against. (not null)
+ * @param sparql - The query that will be executed. (not null)
+ */
+ public QueryBenchmarkRun(final SailConnection sailConn, final String sparql) {
+ this.sailConn = requireNonNull(sailConn);
+ this.sparql = requireNonNull(sparql);
+ this.numReads = Optional.empty();
+ }
+
+ /**
+ * Constructs an instance of {@link QueryBenchmarkRun} that will only read a specific number of results.
+ *
+ * @param sailConn - The connection to the Rya instance the query will be executed against. (not null)
+ * @param sparql - The query that will be executed. (not null)
+ * @param numReads - The number of results that will be read. (not null)
+ */
+ public QueryBenchmarkRun(final SailConnection sailConn, final String sparql, final Long numReads) {
+ this.sailConn = requireNonNull(sailConn);
+ this.sparql = requireNonNull(sparql);
+ this.numReads = Optional.of( requireNonNull(numReads) );
+ }
+
+ public void run() throws MalformedQueryException, QueryEvaluationException, NotEnoughResultsException, SailException {
+ CloseableIteration<? extends BindingSet, QueryEvaluationException> it = null;
+
+ try {
+ // Execute the query.
+ final SPARQLParser sparqlParser = new SPARQLParser();
+ final ParsedQuery parsedQuery = sparqlParser.parseQuery(sparql, null);
+ it = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+ // Perform the reads.
+ if(numReads.isPresent()) {
+ read(it, numReads.get() );
+ } else {
+ readAll(it);
+ }
+ } finally {
+ if(it != null) {
+ it.close();
+ }
+ }
+ }
+
+ private void read(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it, final long numReads) throws QueryEvaluationException, NotEnoughResultsException {
+ requireNonNull(it);
+ long i = 0;
+ while(i < numReads) {
+ if(!it.hasNext()) {
+ throw new NotEnoughResultsException(String.format("The SPARQL query did not result in enough results. Needed: %d Found: %d", numReads, i));
+ }
+ it.next();
+ i++;
+ }
+ }
+
+ private void readAll(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it) throws QueryEvaluationException {
+ requireNonNull(it);
+ while(it.hasNext()) {
+ it.next();
+ }
+ }
+
+ /**
+ * The benchmark must read a specific number of results, but the benchmarked query
+ * does not have enough results to meet that number.
+ */
+ public static final class NotEnoughResultsException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughResultsException(final String message) {
+ super(message);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/resources/LICENSE.txt
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/resources/LICENSE.txt b/extras/rya.benchmark/src/main/resources/LICENSE.txt
new file mode 100644
index 0000000..4a9fe83
--- /dev/null
+++ b/extras/rya.benchmark/src/main/resources/LICENSE.txt
@@ -0,0 +1,16 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
new file mode 100644
index 0000000..826083e
--- /dev/null
+++ b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
+
+ <xsd:element name="QueriesBenchmarkConf">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="Rya" type="Rya"/>
+ <xsd:element name="Parameters" type="Parameters"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:complexType name="Rya">
+ <xsd:sequence>
+ <xsd:element name="ryaInstanceName" type="xsd:string" />
+ <xsd:element name="accumulo">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="username" type="xsd:string"/>
+ <xsd:element name="password" type="xsd:string"/>
+ <xsd:element name="zookeepers" type="xsd:string"/>
+ <xsd:element name="instanceName" type="xsd:string"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="secondaryIndexing">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="usePCJ" type="xsd:boolean"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:sequence>
+ </xsd:complexType>
+
+ <xsd:complexType name="Parameters">
+ <xsd:sequence>
+ <xsd:element name="NumReadsRuns">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="NumReads" type="xsd:string" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="Queries" minOccurs="0">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="SPARQL" type="xsd:string" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:sequence>
+ </xsd:complexType>
+</xsd:schema>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
new file mode 100644
index 0000000..f229dc4
--- /dev/null
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.rya.benchmark.query.Parameters.NumReadsRuns;
+import org.apache.rya.benchmark.query.Parameters.Queries;
+import org.apache.rya.benchmark.query.Rya.Accumulo;
+import org.apache.rya.benchmark.query.Rya.SecondaryIndexing;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+/**
+ * Tests the methods of {@link BenchmarkQueriesReader}.
+ */
+public class QueriesBenchmarkConfReaderIT {
+
+ @Test
+ public void load() throws JAXBException, SAXException {
+ // Unmarshal some XML.
+ final String xml =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+ "<QueriesBenchmarkConf>\n" +
+ " <Rya>\n" +
+ " <ryaInstanceName>test_</ryaInstanceName>\n" +
+ " <accumulo>\n" +
+ " <username>test</username>\n" +
+ " <password>t3stP@ssw0rd</password>\n" +
+ " <zookeepers>zoo-server-1,zoo-server-2</zookeepers>\n" +
+ " <instanceName>testInstance</instanceName>\n" +
+ " </accumulo>\n" +
+ " <secondaryIndexing>\n" +
+ " <usePCJ>true</usePCJ>\n" +
+ " </secondaryIndexing>\n" +
+ " </Rya>\n" +
+ " <Parameters>" +
+ " <NumReadsRuns>" +
+ " <NumReads>1</NumReads>" +
+ " <NumReads>10</NumReads>" +
+ " <NumReads>100</NumReads>" +
+ " <NumReads>ALL</NumReads>" +
+ " </NumReadsRuns>" +
+ " <Queries>\n" +
+ " <SPARQL><![CDATA[SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }]]></SPARQL>\n" +
+ " <SPARQL><![CDATA[SELECT ?a ?b WHERE { ?a <http://knows> ?b . }]]></SPARQL>\n" +
+ " </Queries>\n" +
+ " </Parameters>" +
+ "</QueriesBenchmarkConf>";
+
+ final InputStream xmlStream = new ByteArrayInputStream( xml.getBytes(Charsets.UTF_8) );
+ final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load( xmlStream );
+
+ // Ensure it was unmarshalled correctly.
+ final Rya rya = benchmarkConf.getRya();
+ assertEquals("test_", rya.getRyaInstanceName());
+
+ final Accumulo accumulo = rya.getAccumulo();
+ assertEquals("test", accumulo.getUsername());
+ assertEquals("t3stP@ssw0rd", accumulo.getPassword());
+ assertEquals("zoo-server-1,zoo-server-2", accumulo.getZookeepers());
+ assertEquals("testInstance", accumulo.getInstanceName());
+
+ final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing();
+ assertTrue(secondaryIndexing.isUsePCJ());
+
+
+ final Parameters parameters = benchmarkConf.getParameters();
+ final List<String> expectedNumReads = Lists.newArrayList("1", "10", "100", "ALL");
+ final NumReadsRuns NumReads = parameters.getNumReadsRuns();
+ assertEquals(expectedNumReads, NumReads.getNumReads());
+
+ final List<String> expectedQueries = Lists.newArrayList(
+ "SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }",
+ "SELECT ?a ?b WHERE { ?a <http://knows> ?b . }");
+ final Queries queries = parameters.getQueries();
+ assertEquals(expectedQueries, queries.getSPARQL());
+ }
+}
\ No newline at end of file