You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2016/09/27 15:11:09 UTC
[1/5] incubator-rya git commit: RYA-151 Rya Query benchmark tool
implemented using JMH.
Repository: incubator-rya
Updated Branches:
refs/heads/master 62c07940a -> c40dbbcfd
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
new file mode 100644
index 0000000..e45002f
--- /dev/null
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import java.io.File;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.minicluster.MiniAccumuloConfig;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun;
+import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.zookeeper.ClientCnxn;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.client.Install.InstallConfiguration;
+import mvm.rya.api.client.RyaClient;
+import mvm.rya.api.client.accumulo.AccumuloConnectionDetails;
+import mvm.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Integration tests {@link QueryBenchmarkRun}.
+ */
+public class QueryBenchmarkRunIT {
+ private static final Logger log = Logger.getLogger(QueryBenchmarkRunIT.class);
+
+ private static final String RYA_INSTANCE_NAME = "test_";
+ private static final String ACCUMULO_USER = "root";
+ private static final String ACCUMULO_PASSWORD = "password";
+ private static final String SPARQL_QUERY = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }";
+
+ private static MiniAccumuloCluster cluster = null;
+ private static Sail sail = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ // Squash loud logs.
+ Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR);
+
+ // Setup the Mini Accumulo Cluster.
+ final File miniDataDir = com.google.common.io.Files.createTempDir();
+ final MiniAccumuloConfig cfg = new MiniAccumuloConfig( miniDataDir, ACCUMULO_PASSWORD);
+ cluster = new MiniAccumuloCluster(cfg);
+ cluster.start();
+
+ // Create a Rya Client connected to the Mini Accumulo Cluster.
+ final AccumuloConnectionDetails connDetails = new AccumuloConnectionDetails(
+ ACCUMULO_USER,
+ ACCUMULO_PASSWORD.toCharArray(),
+ cluster.getInstanceName(),
+ cluster.getZooKeepers());
+ final Connector connector = cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD);
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connDetails, connector);
+
+ // Install an instance of Rya on the mini cluster.
+ installRya(ryaClient);
+
+ // Get a Sail object that is backed by the Rya store that is on the mini cluster.
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+ ryaConf.setTablePrefix(RYA_INSTANCE_NAME);
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER);
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD);
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, cluster.getZooKeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, cluster.getInstanceName());
+ ryaConf.set(ConfigUtils.USE_PCJ, "true");
+ ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+ ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+
+ sail = RyaSailFactory.getInstance( ryaConf );
+
+ // Load some data into the cluster that will match the query we're testing against.
+ loadTestStatements();
+
+ // Add a PCJ to the application that summarizes the query.
+ createTestPCJ(ryaClient);
+ }
+
+ private static void installRya(final RyaClient ryaClient) throws Exception {
+ // Use the client to install the instance of Rya that will be used for the tests.
+ ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+ .setEnableTableHashPrefix(false)
+ .setEnableGeoIndex(false)
+ .setEnableTemporalIndex(false)
+ .setEnableFreeTextIndex(false)
+ .setEnableEntityCentricIndex(false)
+ .setEnablePcjIndex(true)
+ .build());
+ }
+
+ private static void loadTestStatements() throws Exception {
+ final ValueFactory vf = sail.getValueFactory();
+
+ final SailConnection sailConn = sail.getConnection();
+ sailConn.begin();
+ sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+
+ sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:green"));
+ sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:brown"));
+ sailConn.commit();
+ sailConn.close();
+ }
+
+ private static void createTestPCJ(final RyaClient ryaClient) throws Exception {
+ // Create an empty PCJ within the Rya instance's PCJ storage for the test query.
+ final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME);
+ final String pcjId = pcjs.createPcj(SPARQL_QUERY);
+
+
+ // Batch update the PCJ using the Rya Client.
+ ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if(sail != null) {
+ try {
+ log.info("Shutting down the Sail.");
+ sail.shutDown();
+ } catch (final SailException e) {
+ log.error("Could not shut down the Sail.", e);
+ }
+ }
+
+ if(cluster != null) {
+ try {
+ log.info("Shutting down the mini accumulo cluster.");
+ cluster.stop();
+ } catch (final Exception e) {
+ log.error("Could not shut down the mini accumulo cluster.", e);
+ }
+ }
+ }
+
+ @Test
+ public void read1() throws Exception {
+ new QueryBenchmarkRun(sail.getConnection(), SPARQL_QUERY, 1L).run();
+ }
+
+ @Test
+ public void read5() throws Exception {
+ new QueryBenchmarkRun(sail.getConnection(), SPARQL_QUERY, 5L).run();
+ }
+
+ @Test(expected = NotEnoughResultsException.class)
+ public void read10() throws Exception {
+ new QueryBenchmarkRun(sail.getConnection(), SPARQL_QUERY, 10L).run();
+ }
+
+ @Test
+ public void readAll() throws Exception {
+ new QueryBenchmarkRun(sail.getConnection(), SPARQL_QUERY).run();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.console/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.console/pom.xml b/extras/rya.console/pom.xml
index c2332aa..df9e78a 100644
--- a/extras/rya.console/pom.xml
+++ b/extras/rya.console/pom.xml
@@ -143,6 +143,21 @@
<mainClass>org.springframework.shell.Bootstrap</mainClass>
</transformer>
</transformers>
+
+ <!--
+ Shading signed JARs will fail without this.
+ http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+ -->
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
</configuration>
</execution>
</executions>
@@ -185,4 +200,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.console/src/main/java/mvm/rya/shell/RyaAdminCommands.java
----------------------------------------------------------------------
diff --git a/extras/rya.console/src/main/java/mvm/rya/shell/RyaAdminCommands.java b/extras/rya.console/src/main/java/mvm/rya/shell/RyaAdminCommands.java
index bd91333..3c3e244 100644
--- a/extras/rya.console/src/main/java/mvm/rya/shell/RyaAdminCommands.java
+++ b/extras/rya.console/src/main/java/mvm/rya/shell/RyaAdminCommands.java
@@ -33,12 +33,13 @@ import org.springframework.stereotype.Component;
import com.google.common.base.Optional;
-import mvm.rya.api.client.RyaClientException;
-import mvm.rya.api.client.RyaClient;
import mvm.rya.api.client.GetInstanceDetails;
-import mvm.rya.api.client.InstanceDoesNotExistException;
import mvm.rya.api.client.Install.DuplicateInstanceNameException;
import mvm.rya.api.client.Install.InstallConfiguration;
+import mvm.rya.api.client.InstanceDoesNotExistException;
+import mvm.rya.api.client.PCJDoesNotExistException;
+import mvm.rya.api.client.RyaClient;
+import mvm.rya.api.client.RyaClientException;
import mvm.rya.api.instance.RyaDetails;
import mvm.rya.shell.SharedShellState.ConnectionState;
import mvm.rya.shell.SharedShellState.ShellState;
@@ -55,6 +56,7 @@ public class RyaAdminCommands implements CommandMarker {
public static final String CREATE_PCJ_CMD = "create-pcj";
public static final String DELETE_PCJ_CMD = "delete-pcj";
+ public static final String BATCH_UPDATE_PCJ_CMD = "batch-update-pcj";
public static final String GET_INSTANCE_DETAILS_CMD = "get-instance-details";
public static final String INSTALL_CMD = "install";
public static final String LIST_INSTANCES_CMD = "list-instances";
@@ -69,7 +71,7 @@ public class RyaAdminCommands implements CommandMarker {
*
* @param state - Holds shared state between all of the command classes. (not null)
* @param installPrompt - Prompts a user for installation details. (not null)
- * @param sparqlPrompt - TODO doc
+ * @param sparqlPrompt - Prompts a user for create PCJ details. (not null)
*/
@Autowired
public RyaAdminCommands(final SharedShellState state, final InstallPrompt installPrompt, final SparqlPrompt sparqlPrompt) {
@@ -114,7 +116,8 @@ public class RyaAdminCommands implements CommandMarker {
*/
@CliAvailabilityIndicator({
CREATE_PCJ_CMD,
- DELETE_PCJ_CMD })
+ DELETE_PCJ_CMD,
+ BATCH_UPDATE_PCJ_CMD})
public boolean arePCJCommandsAvailable() {
// The PCJ commands are only available if the Shell is connected to an instance of Rya
// that is new enough to use the RyaDetailsRepository and is configured to maintain PCJs.
@@ -138,18 +141,18 @@ public class RyaAdminCommands implements CommandMarker {
public String listInstances() {
// Fetch the command that is connected to the store.
final ShellState shellState = state.getShellState();
- final RyaClient commands = shellState.getConnectedCommands().get();
- final Optional<String> ryaInstance = shellState.getRyaInstanceName();
+ final RyaClient ryaClient = shellState.getConnectedCommands().get();
+ final Optional<String> ryaInstanceName = shellState.getRyaInstanceName();
try {
// Sort the names alphabetically.
- final List<String> instanceNames = commands.getListInstances().listInstances();
+ final List<String> instanceNames = ryaClient.getListInstances().listInstances();
Collections.sort( instanceNames );
final String report;
final InstanceNamesFormatter formatter = new InstanceNamesFormatter();
- if(ryaInstance.isPresent()) {
- report = formatter.format(instanceNames, ryaInstance.get());
+ if(ryaInstanceName.isPresent()) {
+ report = formatter.format(instanceNames, ryaInstanceName.get());
} else {
report = formatter.format(instanceNames);
}
@@ -163,7 +166,7 @@ public class RyaAdminCommands implements CommandMarker {
@CliCommand(value = INSTALL_CMD, help = "Create a new instance of Rya.")
public String install() {
// Fetch the commands that are connected to the store.
- final RyaClient commands = state.getShellState().getConnectedCommands().get();
+ final RyaClient ryaClient = state.getShellState().getConnectedCommands().get();
String instanceName = null;
InstallConfiguration installConfig = null;
@@ -179,7 +182,7 @@ public class RyaAdminCommands implements CommandMarker {
}
// Execute the command.
- commands.getInstall().install(instanceName, installConfig);
+ ryaClient.getInstall().install(instanceName, installConfig);
return String.format("The Rya instance named '%s' has been installed.", instanceName);
} catch(final DuplicateInstanceNameException e) {
@@ -193,18 +196,18 @@ public class RyaAdminCommands implements CommandMarker {
public String getInstanceDetails() {
// Fetch the command that is connected to the store.
final ShellState shellState = state.getShellState();
- final RyaClient commands = shellState.getConnectedCommands().get();
- final String ryaInstance = shellState.getRyaInstanceName().get();
+ final RyaClient ryaClient = shellState.getConnectedCommands().get();
+ final String ryaInstanceName = shellState.getRyaInstanceName().get();
try {
- final Optional<RyaDetails> details = commands.getGetInstanceDetails().getDetails(ryaInstance);
+ final Optional<RyaDetails> details = ryaClient.getGetInstanceDetails().getDetails(ryaInstanceName);
if(details.isPresent()) {
return new RyaDetailsFormatter().format(details.get());
} else {
return "This instance of Rya does not have a Rya Details table. Consider migrating to a newer version of Rya.";
}
} catch(final InstanceDoesNotExistException e) {
- throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstance), e);
+ throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstanceName), e);
} catch (final RyaClientException e) {
throw new RuntimeException("Could not get the instance details. Reason: " + e.getMessage(), e);
}
@@ -214,18 +217,18 @@ public class RyaAdminCommands implements CommandMarker {
public String createPcj() {
// Fetch the command that is connected to the store.
final ShellState shellState = state.getShellState();
- final RyaClient commands = shellState.getConnectedCommands().get();
- final String ryaInstance = shellState.getRyaInstanceName().get();
+ final RyaClient ryaClient = shellState.getConnectedCommands().get();
+ final String ryaInstanceName = shellState.getRyaInstanceName().get();
try {
// Prompt the user for the SPARQL.
final String sparql = sparqlPrompt.getSparql();
// Execute the command.
- final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql);
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(ryaInstanceName, sparql);
// Return a message that indicates the ID of the newly created ID.
return String.format("The PCJ has been created. Its ID is '%s'.", pcjId);
} catch (final InstanceDoesNotExistException e) {
- throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstance), e);
+ throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstanceName), e);
} catch (final IOException | RyaClientException e) {
throw new RuntimeException("Could not create the PCJ. Provided reasons: " + e.getMessage(), e);
}
@@ -237,18 +240,39 @@ public class RyaAdminCommands implements CommandMarker {
final String pcjId) {
// Fetch the command that is connected to the store.
final ShellState shellState = state.getShellState();
- final RyaClient commands = shellState.getConnectedCommands().get();
- final String ryaInstance = shellState.getRyaInstanceName().get();
+ final RyaClient ryaClient = shellState.getConnectedCommands().get();
+ final String ryaInstanceName = shellState.getRyaInstanceName().get();
try {
// Execute the command.
- commands.getDeletePCJ().deletePCJ(ryaInstance, pcjId);
+ ryaClient.getDeletePCJ().deletePCJ(ryaInstanceName, pcjId);
return "The PCJ has been deleted.";
} catch (final InstanceDoesNotExistException e) {
- throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstance), e);
+ throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstanceName), e);
} catch (final RyaClientException e) {
throw new RuntimeException("The PCJ could not be deleted. Provided reason: " + e.getMessage(), e);
}
}
+
+ @CliCommand(value = BATCH_UPDATE_PCJ_CMD, help = "Batch update a PCJ index using this client. This operation may take a long time.")
+ public String batchUpdatePcj(
+ @CliOption(key={"pcjId"}, mandatory = true, help = "The ID of the PCJ that will be updated.")
+ final String pcjId) {
+ // Fetch the command that is connected to the store.
+ final ShellState shellState = state.getShellState();
+ final RyaClient ryaClient = shellState.getConnectedCommands().get();
+ final String ryaInstanceName = shellState.getRyaInstanceName().get();
+
+ try {
+ ryaClient.getBatchUpdatePCJ().batchUpdate(ryaInstanceName, pcjId);
+ return "The PCJ's results have been updated.";
+ } catch(final InstanceDoesNotExistException e) {
+ throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstanceName), e);
+ } catch(final PCJDoesNotExistException e) {
+ throw new RuntimeException(String.format("A PCJ with ID '%s' does not exist.", pcjId), e);
+ } catch(final RyaClientException e) {
+ throw new RuntimeException("The PCJ could not be deleted. Provided reason: " + e.getMessage(), e);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.console/src/main/java/mvm/rya/shell/util/InstallPrompt.java
----------------------------------------------------------------------
diff --git a/extras/rya.console/src/main/java/mvm/rya/shell/util/InstallPrompt.java b/extras/rya.console/src/main/java/mvm/rya/shell/util/InstallPrompt.java
index c4a81d2..44aa270 100644
--- a/extras/rya.console/src/main/java/mvm/rya/shell/util/InstallPrompt.java
+++ b/extras/rya.console/src/main/java/mvm/rya/shell/util/InstallPrompt.java
@@ -94,9 +94,13 @@ public interface InstallPrompt {
builder.setEnablePcjIndex( enablePCJIndexing );
if(enablePCJIndexing) {
- prompt = "PCJ Updater Fluo Application Name: ";
- final String fluoAppName = promptString(prompt, Optional.<String>absent());
- builder.setFluoPcjAppName(fluoAppName);
+ final boolean useFluoApp = promptBoolean("Use a Fluo application to update the PCJ? ", Optional.absent());
+
+ if(useFluoApp) {
+ prompt = "PCJ Updater Fluo Application Name: ";
+ final String fluoAppName = promptString(prompt, Optional.<String>absent());
+ builder.setFluoPcjAppName(fluoAppName);
+ }
}
prompt = makeFieldPrompt("Use Temporal Indexing", true);
@@ -119,7 +123,11 @@ public interface InstallPrompt {
reader.println(" Use Precomputed Join Indexing: " + installConfig.isPcjIndexEnabled());
if(installConfig.isPcjIndexEnabled()) {
- reader.println(" PCJ Updater Fluo Application Name: " + installConfig.getFluoPcjAppName().get());
+ if(installConfig.getFluoPcjAppName().isPresent()) {
+ reader.println(" PCJ Updater Fluo Application Name: " + installConfig.getFluoPcjAppName().get());
+ } else {
+ reader.println(" Not using a PCJ Updater Fluo Application");
+ }
}
reader.println(" Use Temporal Indexing: " + installConfig.isTemporalIndexEnabled());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.console/src/test/java/mvm/rya/shell/RyaAdminCommandsTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.console/src/test/java/mvm/rya/shell/RyaAdminCommandsTest.java b/extras/rya.console/src/test/java/mvm/rya/shell/RyaAdminCommandsTest.java
index f842a93..430e946 100644
--- a/extras/rya.console/src/test/java/mvm/rya/shell/RyaAdminCommandsTest.java
+++ b/extras/rya.console/src/test/java/mvm/rya/shell/RyaAdminCommandsTest.java
@@ -34,16 +34,18 @@ import org.junit.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-import mvm.rya.api.client.RyaClientException;
-import mvm.rya.api.client.RyaClient;
+import mvm.rya.api.client.BatchUpdatePCJ;
import mvm.rya.api.client.CreatePCJ;
import mvm.rya.api.client.DeletePCJ;
import mvm.rya.api.client.GetInstanceDetails;
import mvm.rya.api.client.Install;
-import mvm.rya.api.client.InstanceDoesNotExistException;
-import mvm.rya.api.client.ListInstances;
import mvm.rya.api.client.Install.DuplicateInstanceNameException;
import mvm.rya.api.client.Install.InstallConfiguration;
+import mvm.rya.api.client.InstanceDoesNotExistException;
+import mvm.rya.api.client.ListInstances;
+import mvm.rya.api.client.PCJDoesNotExistException;
+import mvm.rya.api.client.RyaClient;
+import mvm.rya.api.client.RyaClientException;
import mvm.rya.api.client.accumulo.AccumuloConnectionDetails;
import mvm.rya.api.instance.RyaDetails;
import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
@@ -123,6 +125,33 @@ public class RyaAdminCommandsTest {
}
@Test
+ public void batchUpdatePCJ() throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+ // Mock the object that performs the update PCJ operation.
+ final BatchUpdatePCJ mockBatchUpdatePCJ = mock(BatchUpdatePCJ.class);
+
+ final RyaClient mockRyaClient = mock(RyaClient.class);
+ when(mockRyaClient.getBatchUpdatePCJ()).thenReturn( mockBatchUpdatePCJ );
+
+ final SharedShellState state = new SharedShellState();
+ state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockRyaClient);
+ final String instanceName = "unitTests";
+ state.connectedToInstance(instanceName);
+
+ // Execute the command.
+ final String pcjId = "12343214312";
+
+ final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mock(SparqlPrompt.class));
+ final String message = commands.batchUpdatePcj(pcjId);
+
+ // Verify the values that were provided to the command were passed through to the BatchUpdatePCJ.
+ verify(mockBatchUpdatePCJ).batchUpdate(eq(instanceName), eq(pcjId));
+
+ // Verify a message is returned that explains what was updated.
+ final String expected = "The PCJ's results have been updated.";
+ assertEquals(message, expected);
+ }
+
+ @Test
public void getInstanceDetails() throws InstanceDoesNotExistException, RyaClientException {
// This test is failed if the default timezone was not EST, so now it's fixed at EST.
// If you get assert mismatch of EST!=EDT, try the deprecated getTimeZone("EST") instead.
[5/5] incubator-rya git commit: RYA-151 Fixed Arrays import
statement. Closes #85 kchilton2/RYA-151.
Posted by pu...@apache.org.
RYA-151 Fixed Arrays import statement. Closes #85 kchilton2/RYA-151.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/c40dbbcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/c40dbbcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/c40dbbcf
Branch: refs/heads/master
Commit: c40dbbcfda1257f4586dbf4a6ea51a1cbda9072f
Parents: 55df2fd
Author: Kevin Chilton <ke...@parsons.com>
Authored: Wed Sep 14 16:03:07 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Sep 27 11:09:50 2016 -0400
----------------------------------------------------------------------
.../mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c40dbbcf/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
index 7c48315..d79989c 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
@@ -21,13 +21,14 @@ package mvm.rya.indexing.external;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Arrays;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
-import cern.colt.Arrays;
import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
[2/5] incubator-rya git commit: RYA-151 Rya Query benchmark tool
implemented using JMH.
Posted by pu...@apache.org.
RYA-151 Rya Query benchmark tool implemented using JMH.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e77e839d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e77e839d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e77e839d
Branch: refs/heads/master
Commit: e77e839d76e140bee47eb610cf19c453ef0a79b5
Parents: 62c0794
Author: Kevin Chilton <ke...@parsons.com>
Authored: Wed Aug 17 22:29:41 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Sep 27 11:05:06 2016 -0400
----------------------------------------------------------------------
.../java/mvm/rya/api/client/BatchUpdatePCJ.java | 22 ++
.../api/client/PCJDoesNotExistException.java | 20 ++
.../main/java/mvm/rya/api/client/RyaClient.java | 13 +-
.../api/instance/RyaDetailsToConfiguration.java | 72 ++--
.../java/mvm/rya/accumulo/AccumuloITBase.java | 8 +-
.../client/accumulo/AccumuloBatchUpdatePCJ.java | 226 +++++++++++++
.../api/client/accumulo/AccumuloCreatePCJ.java | 64 ++--
.../accumulo/AccumuloRyaClientFactory.java | 1 +
.../PrecomputedJoinStorageSupplier.java | 3 +-
.../accumulo/AccumuloBatchUpdatePCJIT.java | 135 ++++++++
extras/pom.xml | 7 +-
extras/rya.benchmark/pom.xml | 250 ++++++++++++++
.../query/QueriesBenchmarkConfReader.java | 83 +++++
.../rya/benchmark/query/QueryBenchmark.java | 336 +++++++++++++++++++
.../src/main/resources/LICENSE.txt | 16 +
.../src/main/xsd/queries-benchmark-conf.xsd | 74 ++++
.../query/QueriesBenchmarkConfReaderIT.java | 105 ++++++
.../benchmark/query/QueryBenchmarkRunIT.java | 196 +++++++++++
extras/rya.console/pom.xml | 17 +-
.../java/mvm/rya/shell/RyaAdminCommands.java | 72 ++--
.../java/mvm/rya/shell/util/InstallPrompt.java | 16 +-
.../mvm/rya/shell/RyaAdminCommandsTest.java | 37 +-
22 files changed, 1672 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
new file mode 100644
index 0000000..20d90e0
--- /dev/null
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
@@ -0,0 +1,22 @@
+package mvm.rya.api.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * Batch update a PCJ index.
+ */
+@ParametersAreNonnullByDefault
+public interface BatchUpdatePCJ {
+
+ /**
+ * Batch update a specific PCJ index using the {@link Statement}s that are
+ * currently in the Rya instance.
+ *
+ * @param ryaInstanceName - The Rya instance whose PCJ will be updated. (not null)
+ * @param pcjId - Identifies the PCJ index to update. (not null)
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws PCJDoesNotExistException No PCJ exists for the provided PCJ ID.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public void batchUpdate(String ryaInstanceName, String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
new file mode 100644
index 0000000..63efe0c
--- /dev/null
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
@@ -0,0 +1,20 @@
+package mvm.rya.api.client;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+/**
+ * One of the {@link RyaClient} commands could not execute because the connected
+ * instance of Rya does not have a PCJ matching the provided PCJ ID.
+ */
+@ParametersAreNonnullByDefault
+public class PCJDoesNotExistException extends RyaClientException {
+ private static final long serialVersionUID = 1L;
+
+ public PCJDoesNotExistException(final String message) {
+ super(message);
+ }
+
+ public PCJDoesNotExistException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
index 173e1e0..851a273 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/RyaClient.java
@@ -33,23 +33,26 @@ public class RyaClient {
private final Install install;
private final CreatePCJ createPcj;
private final DeletePCJ deletePcj;
+ private final BatchUpdatePCJ batchUpdatePcj;
private final GetInstanceDetails getInstanceDetails;
private final InstanceExists instanceExists;
private final ListInstances listInstances;
/**
- * Constructs an isntance of {@link RyaClient}.
+ * Constructs an instance of {@link RyaClient}.
*/
public RyaClient(
final Install install,
final CreatePCJ createPcj,
final DeletePCJ deletePcj,
+ final BatchUpdatePCJ batchUpdatePcj,
final GetInstanceDetails getInstanceDetails,
final InstanceExists instanceExists,
final ListInstances listInstances) {
this.install = requireNonNull(install);
this.createPcj = requireNonNull(createPcj);
this.deletePcj = requireNonNull(deletePcj);
+ this.batchUpdatePcj = requireNonNull(batchUpdatePcj);
this.getInstanceDetails = requireNonNull(getInstanceDetails);
this.instanceExists = requireNonNull(instanceExists);
this.listInstances = requireNonNull(listInstances);
@@ -79,6 +82,14 @@ public class RyaClient {
}
/**
+ * @return An instnace of {@link BatchUpdatePCJ} that is connected to a Rya storage
+ * if the Rya instance supports PCJ indexing.
+ */
+ public BatchUpdatePCJ getBatchUpdatePCJ() {
+ return batchUpdatePcj;
+ }
+
+ /**
* @return An instance of {@link GetInstanceDetails} that is connected to a Rya storage.
*/
public GetInstanceDetails getGetInstanceDetails() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
index faec0ff..8734adc 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/instance/RyaDetailsToConfiguration.java
@@ -18,29 +18,34 @@
*/
package mvm.rya.api.instance;
+import static java.util.Objects.requireNonNull;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
/**
* Used to fetch {@link RyaDetails} from a {@link RyaDetailsRepository} and
* add them to the application's {@link Configuration}.
*/
+@ParametersAreNonnullByDefault
public class RyaDetailsToConfiguration {
- private static final Logger LOG = Logger.getLogger(RyaDetailsToConfiguration.class);
+ private static final Logger log = Logger.getLogger(RyaDetailsToConfiguration.class);
+
/**
* Ensures the values in the {@link Configuration} do not conflict with the values in {@link RyaDetails}.
* If they do, the values in {@link RyaDetails} take precedent and the {@link Configuration} value will
* be overwritten.
*
- * @param details - The {@link RyaDetails} to add to the {@link Configuration}.
- * @param conf - The {@link Configuration} to add {@link RyaDetails} to.
+ * @param details - The {@link RyaDetails} to add to the {@link Configuration}. (not null)
+ * @param conf - The {@link Configuration} to add {@link RyaDetails} to. (not null)
*/
public static void addRyaDetailsToConfiguration(final RyaDetails details, final Configuration conf) {
- Preconditions.checkNotNull(details);
- Preconditions.checkNotNull(conf);
+ requireNonNull(details);
+ requireNonNull(conf);
checkAndSet(conf, ConfigurationFields.USE_ENTITY, details.getEntityCentricIndexDetails().isEnabled());
checkAndSet(conf, ConfigurationFields.USE_FREETEXT, details.getFreeTextIndexDetails().isEnabled());
@@ -50,23 +55,44 @@ public class RyaDetailsToConfiguration {
}
/**
- * Checks to see if the configuration has a value in the specified field.
- * If the value exists and does not match what is expected by the {@link RyaDetails},
- * an error will be logged and the value will be overwritten.
- * @param conf - The {@link Configuration} to potentially change.
- * @param field - The field to check and set.
- * @param value - The new value in the field (is not used if the value doesn't need to be changed).
+ * Ensures a Rya Client will not try to use a secondary index that is not not supported by the Rya Instance
+ * it is connecting to.
+ * </p>
+ * If the configuration...
+ * <ul>
+ * <li>provides an 'on' value for an index that is supported, then nothing changes.</li>
+ * <li>provides an 'off' value for an index that is or is not supported, then nothing changes.</li>
+ * <li>provides an 'on' value for an index that is not supported, then the index is turned
+ * off and a warning is logged.</li>
+ * <li>does not provide any value for an index, then it will be turned on if supported.</li>
+ * </ul>
+ *
+ * @param conf - The {@link Configuration} to potentially change. (not null)
+ * @param useIndexField - The field within {@code conf} that indicates if the client will utilize the index. (not null)
+ * @param indexSupported - {@code true} if the Rya Instance supports the index; otherwise {@code false}.
*/
- private static void checkAndSet(final Configuration conf, final String field, final boolean value) {
- final Optional<String> opt = Optional.fromNullable(conf.get(field));
- if(opt.isPresent()) {
- final Boolean curVal = new Boolean(opt.get());
- if(curVal != value) {
- LOG.error("The user configured value in: " + field + " will be overwritten by what has been configured by the admin.");
- conf.setBoolean(field, value);
- }
- } else {
- conf.setBoolean(field, value);
+ private static void checkAndSet(final Configuration conf, final String useIndexField, final boolean indexSupported) {
+ requireNonNull(conf);
+ requireNonNull(useIndexField);
+
+ final Optional<String> useIndexStr = Optional.fromNullable( conf.get(useIndexField) );
+
+ // If no value was provided, default to using the index if it is supported.
+ if(!useIndexStr.isPresent()) {
+ log.info("No Rya Client configuration was provided for the " + useIndexField +
+ " index, so it is being defaulted to " + indexSupported);
+ conf.setBoolean(useIndexField, indexSupported);
+ return;
+ }
+
+ // If configured to use the index, but the Rya Instance does not support it, then turn it off.
+ final boolean useIndex = Boolean.parseBoolean( useIndexStr.get() );
+ if(useIndex && !indexSupported) {
+ log.warn("The Rya Client indicates it wants to use a secondary index that the Rya Instance does not support. " +
+ "This is not allowed, so the index will be turned off. Index Configuration Field: " + useIndexField);
+ conf.setBoolean(useIndexField, false);
}
+
+ // Otherwise use whatever the Client wants to use.
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
----------------------------------------------------------------------
diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
index 2a1c384..7dd23e6 100644
--- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
+++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/AccumuloITBase.java
@@ -95,11 +95,9 @@ public class AccumuloITBase {
}
/**
- * TODO doc
- *
- * @return
- * @throws AccumuloSecurityException
- * @throws AccumuloException
+ * @return A {@link Connector} that creates connections to the mini accumulo cluster.
+ * @throws AccumuloException Could not connect to the cluster.
+ * @throws AccumuloSecurityException Could not connect to the cluster because of a security violation.
*/
public Connector getConnector() throws AccumuloException, AccumuloSecurityException {
return cluster.getConnector();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
new file mode 100644
index 0000000..ee773b0
--- /dev/null
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -0,0 +1,226 @@
+package mvm.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import mvm.rya.api.client.BatchUpdatePCJ;
+import mvm.rya.api.client.InstanceDoesNotExistException;
+import mvm.rya.api.client.PCJDoesNotExistException;
+import mvm.rya.api.client.RyaClientException;
+import mvm.rya.api.instance.RyaDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
+import mvm.rya.api.instance.RyaDetailsRepository;
+import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import mvm.rya.api.instance.RyaDetailsUpdater;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
+import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.rdftriplestore.inference.InferenceEngineException;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Uses an in memory Rya Client to batch update a PCJ index.
+ */
+public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpdatePCJ {
+
+ private static final Logger log = Logger.getLogger(AccumuloBatchUpdatePCJ.class);
+
+ /**
+ * Constructs an instance of {@link AccumuloBatchUpdatePCJ}.
+ *
+ * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+ * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+ */
+ public AccumuloBatchUpdatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+ super(connectionDetails, connector);
+ }
+
+ @Override
+ public void batchUpdate(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+ requireNonNull(ryaInstanceName);
+ requireNonNull(pcjId);
+ verifyPCJState(ryaInstanceName, pcjId);
+ updatePCJResults(ryaInstanceName, pcjId);
+ updatePCJMetadata(ryaInstanceName, pcjId);
+ }
+
+ private void verifyPCJState(final String ryaInstanceName, final String pcjId) throws RyaClientException {
+ try {
+ // Fetch the Rya instance's details.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+ final RyaDetails ryaDetails = detailsRepo.getRyaInstanceDetails();
+
+ // Ensure PCJs are enabled.
+ if(!ryaDetails.getPCJIndexDetails().isEnabled()) {
+ throw new RyaClientException("PCJs are not enabled for the Rya instance named '" + ryaInstanceName + "'.");
+ }
+
+ // Ensure the PCJ exists.
+ if(!ryaDetails.getPCJIndexDetails().getPCJDetails().containsKey(pcjId)) {
+ throw new PCJDoesNotExistException("The PCJ with id '" + pcjId + "' does not exist within Rya instance '" + ryaInstanceName + "'.");
+ }
+
+ // Ensure the PCJ is not already being incrementally updated.
+ final PCJDetails pcjDetails = ryaDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final Optional<PCJUpdateStrategy> updateStrategy = pcjDetails.getUpdateStrategy();
+ if(updateStrategy.isPresent() && updateStrategy.get() == PCJUpdateStrategy.INCREMENTAL) {
+ throw new RyaClientException("The PCJ with id '" + pcjId + "' is already being updated incrementally.");
+ }
+ } catch(final NotInitializedException e) {
+ throw new InstanceDoesNotExistException("No RyaDetails are initialized for the Rya instance named '" + ryaInstanceName + "'.", e);
+ } catch (final RyaDetailsRepositoryException e) {
+ throw new RyaClientException("Could not fetch the RyaDetails for the Rya instance named '" + ryaInstanceName + "'.", e);
+ }
+ }
+
+ private void updatePCJResults(final String ryaInstanceName, final String pcjId) throws InstanceDoesNotExistException, PCJDoesNotExistException, RyaClientException {
+ // Things that have to be closed before we exit.
+ Sail sail = null;
+ SailConnection sailConn = null;
+ CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null;
+
+ try {
+ // Create an instance of Sail backed by the Rya instance.
+ sail = connectToRya(ryaInstanceName);
+
+ // Purge the old results from the PCJ.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName);
+ try {
+ pcjStorage.purge(pcjId);
+ } catch (final PCJStorageException e) {
+ throw new RyaClientException("Could not batch update PCJ with ID '" + pcjId + "' because the old " +
+ "results could not be purged from it.", e);
+ }
+
+ try {
+ // Parse the PCJ's SPARQL query.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = metadata.getSparql();
+ final SPARQLParser parser = new SPARQLParser();
+ final ParsedQuery parsedQuery = parser.parseQuery(sparql, null);
+
+ // Execute the query.
+ sailConn = sail.getConnection();
+ results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+ // Load the results into the PCJ table.
+ final List<VisibilityBindingSet> batch = new ArrayList<>(1000);
+
+ while(results.hasNext()) {
+ final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), "");
+ batch.add(result);
+
+ if(batch.size() == 1000) {
+ pcjStorage.addResults(pcjId, batch);
+ batch.clear();
+ }
+ }
+
+ if(!batch.isEmpty()) {
+ pcjStorage.addResults(pcjId, batch);
+ batch.clear();
+ }
+ } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) {
+ throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e);
+ }
+ } finally {
+ if(results != null) {
+ try {
+ results.close();
+ } catch (final QueryEvaluationException e) {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ if(sailConn != null) {
+ try {
+ sailConn.close();
+ } catch (final SailException e) {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ if(sail != null) {
+ try {
+ sail.shutDown();
+ } catch (final SailException e) {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ }
+
+ private Sail connectToRya(final String ryaInstanceName) throws RyaClientException {
+ try {
+ final AccumuloConnectionDetails connectionDetails = super.getAccumuloConnectionDetails();
+
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+ ryaConf.setTablePrefix(ryaInstanceName);
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername());
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword()));
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName());
+
+ // Turn PCJs off so that we will only scan the core Rya tables while building the PCJ results.
+ ryaConf.set(ConfigUtils.USE_PCJ, "false");
+
+ return RyaSailFactory.getInstance(ryaConf);
+ } catch (SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) {
+ throw new RyaClientException("Could not connect to the Rya instance named '" + ryaInstanceName + "'.", e);
+ }
+ }
+
+ private void updatePCJMetadata(final String ryaInstanceName, final String pcjId) throws RyaClientException {
+ // Update the PCJ's metadata to indicate it was just batch updated.
+ try {
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(super.getConnector(), ryaInstanceName);
+
+ new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+ @Override
+ public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+ // Update the original PCJ Details to indicate they were batch updated.
+ final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+ .setUpdateStrategy( PCJUpdateStrategy.BATCH )
+ .setLastUpdateTime( new Date());
+
+ // Replace the old PCJ Details with the updated ones.
+ final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+ builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+ return builder.build();
+ }
+ });
+ } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new RyaClientException("Could not update the PCJ's metadata.", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 92b5d8c..30be548 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -92,13 +92,6 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
}
- final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
- final boolean usingFluo = fluoDetailsHolder.isPresent();
- if(!usingFluo) {
- throw new RyaClientException( String.format("Can not create a PCJ for the '%s' instance of Rya because it does" +
- "not have a Fluo application associated with it. Update the instance's PCJ Index Details to fix this problem.", instanceName) );
- }
-
// Create the PCJ table that will receive the index results.
final String pcjId;
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName);
@@ -108,33 +101,36 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
throw new RyaClientException("Problem while initializing the PCJ table.", e);
}
- // Task the Fluo application with updating the PCJ.
- final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
- try {
- updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
- } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) {
- throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
- }
-
- // Update the Rya Details to indicate the PCJ is being updated incrementally.
- final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
- try {
- new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
- @Override
- public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
- // Update the original PCJ Details to indicate they are incrementally updated.
- final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
- final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
- .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
-
- // Replace the old PCJ Details with the updated ones.
- final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
- builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
- return builder.build();
- }
- });
- } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
- throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+ // If a Fluo application is being used, task it with updating the PCJ.
+ final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+ if(fluoDetailsHolder.isPresent()) {
+ final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+ try {
+ updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+ } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) {
+ throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+ }
+
+ // Update the Rya Details to indicate the PCJ is being updated incrementally.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName);
+ try {
+ new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() {
+ @Override
+ public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException {
+ // Update the original PCJ Details to indicate they are incrementally updated.
+ final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId);
+ final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails )
+ .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL );
+
+ // Replace the old PCJ Details with the updated ones.
+ final RyaDetails.Builder builder = RyaDetails.builder(originalDetails);
+ builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails );
+ return builder.build();
+ }
+ });
+ } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e);
+ }
}
// Return the ID that was assigned to the PCJ.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 8c276a8..102f667 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -52,6 +52,7 @@ public class AccumuloRyaClientFactory {
new AccumuloInstall(connectionDetails, connector),
new AccumuloCreatePCJ(connectionDetails, connector),
new AccumuloDeletePCJ(connectionDetails, connector),
+ new AccumuloBatchUpdatePCJ(connectionDetails, connector),
new AccumuloGetInstanceDetails(connectionDetails, connector),
new AccumuloInstanceExists(connectionDetails, connector),
new AccumuloListInstances(connectionDetails, connector));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
index bf10c84..7c48315 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/PrecomputedJoinStorageSupplier.java
@@ -27,6 +27,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
+import cern.colt.Arrays;
import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
import mvm.rya.indexing.external.accumulo.AccumuloPcjStorageSupplier;
@@ -64,7 +65,7 @@ public class PrecomputedJoinStorageSupplier implements Supplier<PrecomputedJoinS
// Ensure the storage type has been set.
final Optional<PrecomputedJoinStorageType> storageType = indexerConfig.getPcjStorageType();
checkArgument(storageType.isPresent(), "The '" + PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE +
- "' property must have one of the following values: " + PrecomputedJoinStorageType.values());
+ "' property must have one of the following values: " + Arrays.toString(PrecomputedJoinStorageType.values()));
// Create and return the configured storage.
switch(storageType.get()) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
new file mode 100644
index 0000000..f23f1c4
--- /dev/null
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -0,0 +1,135 @@
+package mvm.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+
+import mvm.rya.accumulo.AccumuloITBase;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.api.client.Install.InstallConfiguration;
+import mvm.rya.api.client.RyaClient;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * Integration tests the methods of {@link AccumuloBatchUpdatePCJ}.
+ */
+public class AccumuloBatchUpdatePCJIT extends AccumuloITBase {
+
+ private static final String RYA_INSTANCE_NAME = "test_";
+
+ @Test
+ public void batchUpdate() throws Exception {
+ // Setup a Rya Client.
+ final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails(
+ super.getUsername(),
+ super.getPassword().toCharArray(),
+ super.getInstanceName(),
+ super.getZookeepers());
+ final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+ // Install an instance of Rya on the mini accumulo cluster.
+ ryaClient.getInstall().install(RYA_INSTANCE_NAME, InstallConfiguration.builder()
+ .setEnablePcjIndex(true)
+ .build());
+
+ Sail sail = null;
+ try {
+ // Get a Sail connection backed by the installed Rya instance.
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+ ryaConf.setTablePrefix(RYA_INSTANCE_NAME);
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, super.getUsername());
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, super.getPassword());
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, super.getZookeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, super.getInstanceName());
+ ryaConf.set(ConfigUtils.USE_PCJ, "true");
+ ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+ ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+ sail = RyaSailFactory.getInstance( ryaConf );
+
+ // Load some statements into the Rya instance.
+ final ValueFactory vf = sail.getValueFactory();
+
+ final SailConnection sailConn = sail.getConnection();
+ sailConn.begin();
+ sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+ sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:likes"), vf.createURI("urn:icecream"));
+
+ sailConn.addStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:David"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Eve"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:Frank"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:blue"));
+ sailConn.addStatement(vf.createURI("urn:George"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:green"));
+ sailConn.addStatement(vf.createURI("urn:Hillary"), vf.createURI("urn:hasEyeColor"), vf.createURI("urn:brown"));
+ sailConn.commit();
+ sailConn.close();
+
+ // Create a PCJ for a SPARQL query.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME);
+ final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Run the test.
+ ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId);
+
+ // Verify the correct results were loaded into the PCJ table.
+ final Set<BindingSet> expectedResults = new HashSet<>();
+
+ MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Alice"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Bob"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Charlie"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:David"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Eve"));
+ expectedResults.add(bs);
+
+ bs = new MapBindingSet();
+ bs.addBinding("name", vf.createURI("urn:Frank"));
+ expectedResults.add(bs);
+
+ final Set<BindingSet> results = new HashSet<>();
+ for(final BindingSet result : pcjStorage.listResults(pcjId)) {
+ results.add( result );
+ }
+
+ assertEquals(expectedResults, results);
+
+ } finally {
+ if(sail != null) {
+ sail.shutDown();
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 9568e27..0ffeb7a 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -1,5 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
-
+<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@@ -18,7 +17,6 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
@@ -44,5 +42,6 @@ under the License.
<module>vagrantExample</module>
<module>rya.pcj.fluo</module>
<module>rya.merger</module>
+ <module>rya.benchmark</module>
</modules>
-</project>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml
new file mode 100644
index 0000000..5b9eb68
--- /dev/null
+++ b/extras/rya.benchmark/pom.xml
@@ -0,0 +1,250 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <artifactId>rya.extras</artifactId>
+ <groupId>org.apache.rya</groupId>
+ <version>3.2.10-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>rya.benchmark</artifactId>
+
+ <name>Apache Rya Benchmarks</name>
+
+ <dependencies>
+ <!-- JMH Benchmark Framework dependencies -->
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>3.0.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-minicluster</artifactId>
+ <version>${accumulo.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+
+ <!--
+ JMH version to use with this project.
+ -->
+ <jmh.version>1.13</jmh.version>
+
+ <!--
+ Java source/target to use for compilation.
+ -->
+ <javac.target>1.8</javac.target>
+
+ <!--
+ Name of the benchmark Uber-JAR to generate.
+ -->
+ <uberjar.name>benchmarks</uberjar.name>
+ </properties>
+
+ <build>
+ <resources>
+ <resource>
+ <directory>src/main/xsd</directory>
+ </resource>
+ </resources>
+
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <compilerVersion>${javac.target}</compilerVersion>
+ <source>${javac.target}</source>
+ <target>${javac.target}</target>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>jaxb2-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>xjc</id>
+ <goals>
+ <goal>xjc</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <!-- Place the generated source within the 'src' directory so license-maven-plugin will find it. -->
+ <outputDirectory>src/main/gen</outputDirectory>
+ <packageName>org.apache.rya.benchmark.query</packageName>
+ </configuration>
+ </plugin>
+
+ <!-- Automatically place Apache 2 license headers at the top of all of the project's Java files.
+ Rat runs during the 'validate' lifecycle step, so it will fail the build before this one
+ executes if any of the headers are missing. Run the build with rat turned off to add
+ missing headers to the Java files. -->
+ <plugin>
+ <groupId>com.mycila</groupId>
+ <artifactId>license-maven-plugin</artifactId>
+ <version>2.6</version>
+ <configuration>
+ <!-- We use a custome Apache 2.0 license because we do not include a copywrite section. -->
+ <header>src/main/resources/LICENSE.txt</header>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>process-sources</phase>
+ <goals>
+ <goal>format</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <finalName>${uberjar.name}</finalName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>org.openjdk.jmh.Main</mainClass>
+ </transformer>
+ </transformers>
+ <filters>
+ <filter>
+ <!--
+ Shading signed JARs will fail without this.
+ http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar
+ -->
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-deploy-plugin</artifactId>
+ <version>2.8.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-install-plugin</artifactId>
+ <version>2.5.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <version>2.9.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-resources-plugin</artifactId>
+ <version>2.6</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-site-plugin</artifactId>
+ <version>3.3</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-source-plugin</artifactId>
+ <version>2.2.1</version>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.17</version>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
new file mode 100644
index 0000000..8cbf203
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReader.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+
+import org.xml.sax.SAXException;
+
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
+
+/**
+ * Unmarshalls instances of {@link QueriesBenchmarkConf}.
+ */
+@ParametersAreNonnullByDefault
+public final class QueriesBenchmarkConfReader {
+
+ // It is assumed the schema file is held within the root directory of the packaged jar.
+ private static final String SCHEMA_LOCATION = "queries-benchmark-conf.xsd";
+
+ // Only load the Schema once.
+ private static final Supplier<Schema> SCHEMA_SUPPLIER = Suppliers.memoize(
+ new Supplier<Schema>() {
+ @Override
+ public Schema get() {
+ final InputStream schemaStream = ClassLoader.getSystemResourceAsStream(SCHEMA_LOCATION);
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ try {
+ return schemaFactory.newSchema( new StreamSource( schemaStream ) );
+ } catch (final SAXException e) {
+ throw new RuntimeException("Could not load the '" + SCHEMA_LOCATION + "' schema file. Make sure it is on the classpath.", e);
+ }
+ }
+ });
+
+ /**
+ * Unmarshall an instance of {@link QueriesBenchmarkConf} from the XML that
+ * is retrieved from an {@link InputStream}.
+ *
+ * @param xmlStream - The input stream holding the XML. (not null)
+ * @return The {@link BenchmarkQueries} instance that was read from the stream.
+ * @throws JAXBException There was a problem with the formatting of the XML.
+ */
+ public QueriesBenchmarkConf load(final InputStream xmlStream) throws JAXBException {
+ requireNonNull(xmlStream);
+
+ // Load the schema that describes the stream.
+ final Schema schema = SCHEMA_SUPPLIER.get();
+
+ // Unmarshal the object from the stream.
+ final JAXBContext context = JAXBContext.newInstance( QueriesBenchmarkConf.class );
+ final Unmarshaller unmarshaller = context.createUnmarshaller();
+ unmarshaller.setSchema(schema);
+ return (QueriesBenchmarkConf) unmarshaller.unmarshal(xmlStream);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
new file mode 100644
index 0000000..404f183
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/QueryBenchmark.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.rya.benchmark.query.Parameters.NumReadsRuns;
+import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException;
+import org.apache.rya.benchmark.query.Rya.Accumulo;
+import org.apache.rya.benchmark.query.Rya.SecondaryIndexing;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.CommandLineOptions;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.sail.Sail;
+import org.openrdf.sail.SailConnection;
+import org.openrdf.sail.SailException;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType;
+import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType;
+import mvm.rya.sail.config.RyaSailFactory;
+
+/**
+ * A benchmark that may be used to evaluate the performance of SPARQL queries
+ * over a living instance of Rya. It pivots over two dimensions:
+ * <ul>
+ * <li>Which SPARQL query to execute</li>
+ * <li>How many of the query's results to read</li>
+ * </ul>
+ * </p>
+ * These parameters are configured by placing a file named "queries-benchmark-conf.xml"
+ * within the directory the benchmark is being executed from. The schema that defines
+ * this XML file is named "queries-benchmark-conf.xsd" and may be found embedded within
+ * the benchmark's jar file.
+ * </p>
+ * To execute this benchmark, build the project by executing:
+ * <pre>
+ * mvn clean install
+ * </pre>
+ * Transport the "target/benchmarking.jar" file to the system that will execute
+ * the benchmark, write the configuration file, and then execute:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark
+ * </pre>
+ */
+@State(Scope.Thread)
+public class QueryBenchmark {
+
+ /**
+ * The path to the configuration file that this benchmark uses to connect to Rya.
+ */
+ public static final Path QUERY_BENCHMARK_CONFIGURATION_FILE = Paths.get("queries-benchmark-conf.xml");
+
+ /**
+ * Indicates all query results will be read during the benchmark.
+ */
+ public static final String READ_ALL = "ALL";
+
+ @Param({"1", "10", "100", READ_ALL})
+ public String numReads;
+
+ @Param({})
+ public String sparql;
+
+ private Sail sail = null;
+ private SailConnection sailConn = null;
+
+ @Setup
+ public void setup() throws Exception {
+ // Setup logging.
+ final ConsoleAppender console = new ConsoleAppender();
+ console.setLayout(new PatternLayout("%d [%p|%c|%C{1}] %m%n"));
+ console.setThreshold(Level.INFO);
+ console.activateOptions();
+ Logger.getRootLogger().addAppender(console);
+
+ // Load the benchmark's configuration file.
+ final InputStream queriesStream = Files.newInputStream(QUERY_BENCHMARK_CONFIGURATION_FILE);
+ final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream);
+
+ // Create the Rya Configuration object using the benchmark's configuration.
+ final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration();
+
+ final Rya rya = benchmarkConf.getRya();
+ ryaConf.setTablePrefix(rya.getRyaInstanceName());
+
+ final Accumulo accumulo = rya.getAccumulo();
+ ryaConf.set(ConfigUtils.CLOUDBASE_USER, accumulo.getUsername());
+ ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, accumulo.getPassword());
+ ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, accumulo.getZookeepers());
+ ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, accumulo.getInstanceName());
+
+ // Print the query plan so that you can visually inspect how PCJs are being applied for each benchmark.
+ ryaConf.set(ConfigUtils.DISPLAY_QUERY_PLAN, "true");
+
+ // Turn on PCJs if we are configured to use them.
+ final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing();
+ if(secondaryIndexing.isUsePCJ()) {
+ ryaConf.set(ConfigUtils.USE_PCJ, "true");
+ ryaConf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString());
+ ryaConf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.toString());
+ } else {
+ ryaConf.set(ConfigUtils.USE_PCJ, "false");
+ }
+
+ // Create the connections used to execute the benchmark.
+ sail = RyaSailFactory.getInstance( ryaConf );
+ sailConn = sail.getConnection();
+ }
+
+ @TearDown
+ public void tearDown() {
+ try {
+ sailConn.close();
+ } catch(final Exception e) { }
+
+ try {
+ sail.shutDown();
+ } catch(final Exception e) { }
+ }
+
+ @Benchmark
+ @BenchmarkMode(Mode.SingleShotTime)
+ @Timeout(time = 1, timeUnit = TimeUnit.HOURS)
+ public void queryRya() throws MalformedQueryException, QueryEvaluationException, SailException, NotEnoughResultsException {
+ final QueryBenchmarkRun benchmark;
+
+ if(numReads.equals( READ_ALL )) {
+ benchmark = new QueryBenchmarkRun(sailConn, sparql);
+ } else {
+ benchmark = new QueryBenchmarkRun(sailConn, sparql, Long.parseLong(numReads));
+ }
+
+ benchmark.run();
+ }
+
+ /**
+ * Runs the query benchmarks.
+ * </p>
+ * Example command line:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.QueryBenchmark
+ * </pre>
+ *
+ * @param args - The command line arguments that will be fed into the benchmark.
+ * @throws Exception The benchmark could not be run.
+ */
+ public static void main(final String[] args) throws Exception {
+ // Read the queries that will be benchmarked from the provided path.
+ final InputStream queriesStream = Files.newInputStream( QUERY_BENCHMARK_CONFIGURATION_FILE );
+ final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load(queriesStream);
+ final Parameters parameters = benchmarkConf.getParameters();
+
+ // Setup the options that will be used to run the benchmark.
+ final OptionsBuilder options = new OptionsBuilder();
+ options.parent( new CommandLineOptions(args) );
+ options.include(QueryBenchmark.class.getSimpleName());
+
+ // Provide the SPARQL queries that will be injected into the benchmark's 'sparql' parameter.
+ final List<String> sparql = parameters.getQueries().getSPARQL();
+ final String[] sparqlArray = new String[ sparql.size() ];
+ sparql.toArray( sparqlArray );
+
+ // Clean up the sparql's whitespace.
+ for(int i = 0; i < sparqlArray.length; i++) {
+ sparqlArray[i] = sparqlArray[i].trim();
+ }
+
+ options.param("sparql", sparqlArray);
+
+ // If numReadsRuns was specified, inject them into the benchmark's 'numReads' parameter.
+ final NumReadsRuns numReadsRuns = parameters.getNumReadsRuns();
+ if(numReadsRuns != null) {
+ // Validate the list.
+ final List<String> numReadsList = numReadsRuns.getNumReads();
+ for(final String numReads : numReadsList) {
+ // It may be the READ_ALL flag.
+ if(numReads.equals(READ_ALL)) {
+ continue;
+ }
+
+ // Or it must be a Long.
+ try {
+ Long.parseLong(numReads);
+ } catch(final NumberFormatException e) {
+ throw new RuntimeException("There is a problem with the benchmark's configuration. Encountered " +
+ "a numReads value of '" + numReads + "', which is inavlid. The value must be a Long or " +
+ "'" + READ_ALL + "'");
+ }
+ }
+
+ // Configure the benchmark with the numRuns.
+ final String[] numReadsArray = new String[ numReadsList.size() ];
+ numReadsList.toArray( numReadsArray );
+ options.param("numReads", numReadsArray);
+ }
+
+ // Execute the benchmark.
+ new Runner(options.build()).run();
+ }
+
+ /**
+ * Executes an iteration of the benchmarked logic.
+ */
+ @ParametersAreNonnullByDefault
+ public static final class QueryBenchmarkRun {
+
+ private final SailConnection sailConn;
+ private final String sparql;
+ private final Optional<Long> numReads;
+
+ /**
+ * Constructs an instance of {@link QueryBenchmarkRun} that will read all of the results of the query.
+ *
+ * @param sailConn - The connection to the Rya instance the query will be executed against. (not null)
+ * @param sparql - The query that will be executed. (not null)
+ */
+ public QueryBenchmarkRun(final SailConnection sailConn, final String sparql) {
+ this.sailConn = requireNonNull(sailConn);
+ this.sparql = requireNonNull(sparql);
+ this.numReads = Optional.empty();
+ }
+
+ /**
+ * Constructs an instance of {@link QueryBenchmarkRun} that will only read a specific number of results.
+ *
+ * @param sailConn - The connection to the Rya instance the query will be executed against. (not null)
+ * @param sparql - The query that will be executed. (not null)
+ * @param numReads - The number of results that will be read. (not null)
+ */
+ public QueryBenchmarkRun(final SailConnection sailConn, final String sparql, final Long numReads) {
+ this.sailConn = requireNonNull(sailConn);
+ this.sparql = requireNonNull(sparql);
+ this.numReads = Optional.of( requireNonNull(numReads) );
+ }
+
+ public void run() throws MalformedQueryException, QueryEvaluationException, NotEnoughResultsException, SailException {
+ CloseableIteration<? extends BindingSet, QueryEvaluationException> it = null;
+
+ try {
+ // Execute the query.
+ final SPARQLParser sparqlParser = new SPARQLParser();
+ final ParsedQuery parsedQuery = sparqlParser.parseQuery(sparql, null);
+ it = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false);
+
+ // Perform the reads.
+ if(numReads.isPresent()) {
+ read(it, numReads.get() );
+ } else {
+ readAll(it);
+ }
+ } finally {
+ if(it != null) {
+ it.close();
+ }
+ }
+ }
+
+ private void read(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it, final long numReads) throws QueryEvaluationException, NotEnoughResultsException {
+ requireNonNull(it);
+ long i = 0;
+ while(i < numReads) {
+ if(!it.hasNext()) {
+ throw new NotEnoughResultsException(String.format("The SPARQL query did not result in enough results. Needed: %d Found: %d", numReads, i));
+ }
+ it.next();
+ i++;
+ }
+ }
+
+ private void readAll(final CloseableIteration<? extends BindingSet, QueryEvaluationException> it) throws QueryEvaluationException {
+ requireNonNull(it);
+ while(it.hasNext()) {
+ it.next();
+ }
+ }
+
+ /**
+ * The benchmark must read a specific number of results, but the benchmarked query
+ * does not have enough results to meet that number.
+ */
+ public static final class NotEnoughResultsException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ public NotEnoughResultsException(final String message) {
+ super(message);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/resources/LICENSE.txt
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/resources/LICENSE.txt b/extras/rya.benchmark/src/main/resources/LICENSE.txt
new file mode 100644
index 0000000..4a9fe83
--- /dev/null
+++ b/extras/rya.benchmark/src/main/resources/LICENSE.txt
@@ -0,0 +1,16 @@
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
new file mode 100644
index 0000000..826083e
--- /dev/null
+++ b/extras/rya.benchmark/src/main/xsd/queries-benchmark-conf.xsd
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema">
+
+ <xsd:element name="QueriesBenchmarkConf">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="Rya" type="Rya"/>
+ <xsd:element name="Parameters" type="Parameters"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+
+ <xsd:complexType name="Rya">
+ <xsd:sequence>
+ <xsd:element name="ryaInstanceName" type="xsd:string" />
+ <xsd:element name="accumulo">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="username" type="xsd:string"/>
+ <xsd:element name="password" type="xsd:string"/>
+ <xsd:element name="zookeepers" type="xsd:string"/>
+ <xsd:element name="instanceName" type="xsd:string"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="secondaryIndexing">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="usePCJ" type="xsd:boolean"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:sequence>
+ </xsd:complexType>
+
+ <xsd:complexType name="Parameters">
+ <xsd:sequence>
+ <xsd:element name="NumReadsRuns">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="NumReads" type="xsd:string" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ <xsd:element name="Queries" minOccurs="0">
+ <xsd:complexType>
+ <xsd:sequence>
+ <xsd:element name="SPARQL" type="xsd:string" maxOccurs="unbounded"/>
+ </xsd:sequence>
+ </xsd:complexType>
+ </xsd:element>
+ </xsd:sequence>
+ </xsd:complexType>
+</xsd:schema>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e77e839d/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
new file mode 100644
index 0000000..f229dc4
--- /dev/null
+++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueriesBenchmarkConfReaderIT.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.rya.benchmark.query.Parameters.NumReadsRuns;
+import org.apache.rya.benchmark.query.Parameters.Queries;
+import org.apache.rya.benchmark.query.Rya.Accumulo;
+import org.apache.rya.benchmark.query.Rya.SecondaryIndexing;
+import org.junit.Test;
+import org.xml.sax.SAXException;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+
+/**
+ * Tests the methods of {@link BenchmarkQueriesReader}.
+ */
+public class QueriesBenchmarkConfReaderIT {
+
+ @Test
+ public void load() throws JAXBException, SAXException {
+ // Unmarshal some XML.
+ final String xml =
+ "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+ "<QueriesBenchmarkConf>\n" +
+ " <Rya>\n" +
+ " <ryaInstanceName>test_</ryaInstanceName>\n" +
+ " <accumulo>\n" +
+ " <username>test</username>\n" +
+ " <password>t3stP@ssw0rd</password>\n" +
+ " <zookeepers>zoo-server-1,zoo-server-2</zookeepers>\n" +
+ " <instanceName>testInstance</instanceName>\n" +
+ " </accumulo>\n" +
+ " <secondaryIndexing>\n" +
+ " <usePCJ>true</usePCJ>\n" +
+ " </secondaryIndexing>\n" +
+ " </Rya>\n" +
+ " <Parameters>" +
+ " <NumReadsRuns>" +
+ " <NumReads>1</NumReads>" +
+ " <NumReads>10</NumReads>" +
+ " <NumReads>100</NumReads>" +
+ " <NumReads>ALL</NumReads>" +
+ " </NumReadsRuns>" +
+ " <Queries>\n" +
+ " <SPARQL><![CDATA[SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }]]></SPARQL>\n" +
+ " <SPARQL><![CDATA[SELECT ?a ?b WHERE { ?a <http://knows> ?b . }]]></SPARQL>\n" +
+ " </Queries>\n" +
+ " </Parameters>" +
+ "</QueriesBenchmarkConf>";
+
+ final InputStream xmlStream = new ByteArrayInputStream( xml.getBytes(Charsets.UTF_8) );
+ final QueriesBenchmarkConf benchmarkConf = new QueriesBenchmarkConfReader().load( xmlStream );
+
+ // Ensure it was unmarshalled correctly.
+ final Rya rya = benchmarkConf.getRya();
+ assertEquals("test_", rya.getRyaInstanceName());
+
+ final Accumulo accumulo = rya.getAccumulo();
+ assertEquals("test", accumulo.getUsername());
+ assertEquals("t3stP@ssw0rd", accumulo.getPassword());
+ assertEquals("zoo-server-1,zoo-server-2", accumulo.getZookeepers());
+ assertEquals("testInstance", accumulo.getInstanceName());
+
+ final SecondaryIndexing secondaryIndexing = rya.getSecondaryIndexing();
+ assertTrue(secondaryIndexing.isUsePCJ());
+
+
+ final Parameters parameters = benchmarkConf.getParameters();
+ final List<String> expectedNumReads = Lists.newArrayList("1", "10", "100", "ALL");
+ final NumReadsRuns NumReads = parameters.getNumReadsRuns();
+ assertEquals(expectedNumReads, NumReads.getNumReads());
+
+ final List<String> expectedQueries = Lists.newArrayList(
+ "SELECT ?a WHERE { ?a <http://likes> <urn:icecream> . }",
+ "SELECT ?a ?b WHERE { ?a <http://knows> ?b . }");
+ final Queries queries = parameters.getQueries();
+ assertEquals(expectedQueries, queries.getSPARQL());
+ }
+}
\ No newline at end of file
[4/5] incubator-rya git commit: RYA-151 Fixed POMs.
Posted by pu...@apache.org.
RYA-151 Fixed POMs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/55df2fd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/55df2fd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/55df2fd8
Branch: refs/heads/master
Commit: 55df2fd8411820dc1d82e6db3ae72bced31ff406
Parents: 3e17a25
Author: Kevin Chilton <ke...@parsons.com>
Authored: Wed Sep 14 15:58:57 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Sep 27 11:05:08 2016 -0400
----------------------------------------------------------------------
extras/rya.benchmark/pom.xml | 39 ++++-----------------------------------
pom.xml | 29 +++++++++++++++++++++++++++++
2 files changed, 33 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/55df2fd8/extras/rya.benchmark/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/pom.xml b/extras/rya.benchmark/pom.xml
index 5b9eb68..f63be52 100644
--- a/extras/rya.benchmark/pom.xml
+++ b/extras/rya.benchmark/pom.xml
@@ -39,24 +39,23 @@
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
- <version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
- <version>${jmh.version}</version>
<scope>provided</scope>
</dependency>
+ <!-- Rya -->
<dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.indexing</artifactId>
</dependency>
+ <!-- Utils -->
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
- <version>3.0.1</version>
</dependency>
<dependency>
@@ -64,6 +63,7 @@
<artifactId>guava</artifactId>
</dependency>
+ <!-- Testing -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -73,31 +73,10 @@
<dependency>
<groupId>org.apache.accumulo</groupId>
<artifactId>accumulo-minicluster</artifactId>
- <version>${accumulo.version}</version>
<scope>test</scope>
</dependency>
-
</dependencies>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
- <!--
- JMH version to use with this project.
- -->
- <jmh.version>1.13</jmh.version>
-
- <!--
- Java source/target to use for compilation.
- -->
- <javac.target>1.8</javac.target>
-
- <!--
- Name of the benchmark Uber-JAR to generate.
- -->
- <uberjar.name>benchmarks</uberjar.name>
- </properties>
-
<build>
<resources>
<resource>
@@ -107,16 +86,6 @@
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <compilerVersion>${javac.target}</compilerVersion>
- <source>${javac.target}</source>
- <target>${javac.target}</target>
- </configuration>
- </plugin>
-
- <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxb2-maven-plugin</artifactId>
<executions>
@@ -167,7 +136,7 @@
<goal>shade</goal>
</goals>
<configuration>
- <finalName>${uberjar.name}</finalName>
+ <finalName>benchmarks</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/55df2fd8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 26781eb..d5a34f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -120,6 +120,9 @@ under the License.
<maven.min-version>3.0.4</maven.min-version>
<fluo.version>1.0.0-beta-2</fluo.version>
+
+ <jmh.version>1.13</jmh.version>
+ <jsr305.version>3.0.1</jsr305.version>
</properties>
<dependencyManagement>
<dependencies>
@@ -553,6 +556,32 @@ under the License.
<version>${powermock.version}</version>
<scope>test</scope>
</dependency>
+
+ <!-- JMH Benchmarking tool dependencies. -->
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-core</artifactId>
+ <version>${jmh.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.openjdk.jmh</groupId>
+ <artifactId>jmh-generator-annprocess</artifactId>
+ <version>${jmh.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>${jsr305.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.accumulo</groupId>
+ <artifactId>accumulo-minicluster</artifactId>
+ <version>${accumulo.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
[3/5] incubator-rya git commit: RYA-151 Implemented a PCJOptimizer
benchmark tool.
Posted by pu...@apache.org.
RYA-151 Implemented a PCJOptimizer benchmark tool.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/3e17a258
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/3e17a258
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/3e17a258
Branch: refs/heads/master
Commit: 3e17a2582a9ad50c2126c038f38389bf7a83c6aa
Parents: e77e839
Author: Kevin Chilton <ke...@parsons.com>
Authored: Fri Sep 2 16:30:00 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Sep 27 11:05:07 2016 -0400
----------------------------------------------------------------------
.../java/mvm/rya/api/client/BatchUpdatePCJ.java | 18 +
.../api/client/PCJDoesNotExistException.java | 18 +
.../client/accumulo/AccumuloBatchUpdatePCJ.java | 18 +
.../tupleSet/SimpleExternalTupleSet.java | 73 +---
.../rya/indexing/pcj/matching/PCJOptimizer.java | 6 +-
.../accumulo/AccumuloBatchUpdatePCJIT.java | 18 +
.../tupleSet/SimpleExternalTupleSetTest.java | 173 ++++++++
.../benchmark/query/PCJOptimizerBenchmark.java | 421 +++++++++++++++++++
8 files changed, 691 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
index 20d90e0..d6f3454 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/BatchUpdatePCJ.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package mvm.rya.api.client;
import javax.annotation.ParametersAreNonnullByDefault;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
index 63efe0c..89f095f 100644
--- a/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
+++ b/common/rya.api/src/main/java/mvm/rya/api/client/PCJDoesNotExistException.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package mvm.rya.api.client;
import javax.annotation.ParametersAreNonnullByDefault;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
index ee773b0..53f29f4 100644
--- a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
+++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package mvm.rya.api.client.accumulo;
import static java.util.Objects.requireNonNull;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java
index 2c5ef44..ccdb7a8 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSet.java
@@ -1,5 +1,3 @@
-package mvm.rya.indexing.external.tupleSet;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,13 +16,10 @@ package mvm.rya.indexing.external.tupleSet;
* specific language governing permissions and limitations
* under the License.
*/
+package mvm.rya.indexing.external.tupleSet;
-import info.aduna.iteration.CloseableIteration;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+import java.util.ArrayList;
+import java.util.List;
import org.openrdf.query.BindingSet;
import org.openrdf.query.QueryEvaluationException;
@@ -32,51 +27,46 @@ import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryModelVisitor;
import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
+
+import info.aduna.iteration.CloseableIteration;
/**
* This a testing class to create mock pre-computed join nodes in order to
* test the {@link PrecompJoinOptimizer} for query planning.
- *
*/
-
public class SimpleExternalTupleSet extends ExternalTupleSet {
- public SimpleExternalTupleSet(Projection tuple) {
+ /**
+ * Constructs an instance of {@link SimpleExternalTupleSet}.
+ *
+ * @param tuple - An expression that represents the PCJ. (not null)
+ */
+ public SimpleExternalTupleSet(final Projection tuple) {
this.setProjectionExpr(tuple);
setSupportedVarOrders();
}
private void setSupportedVarOrders() {
+ final List<String> varOrders = new ArrayList<>();
- final Set<String> varSet = Sets.newHashSet();
- final Map<String, Set<String>> supportedVarOrders = new HashMap<>();
- String t = "";
-
- for (final String s : this.getTupleExpr().getAssuredBindingNames()) {
- if (t.length() == 0) {
- t = s;
- } else {
- t = t + VAR_ORDER_DELIM + s;
- }
+ String varOrder = "";
+ for(final String var : this.getTupleExpr().getAssuredBindingNames()) {
+ varOrder = varOrder.isEmpty() ? var : varOrder + VAR_ORDER_DELIM + var;
+ varOrders.add( varOrder );
+ }
- varSet.add(s);
- supportedVarOrders.put(t, new HashSet<String>(varSet));
-
- }
- this.setSupportedVariableOrderMap(supportedVarOrders);
+ this.setSupportedVariableOrderMap(varOrders);
}
@Override
- public <X extends Exception> void visit(QueryModelVisitor<X> visitor)
+ public <X extends Exception> void visit(final QueryModelVisitor<X> visitor)
throws X {
visitor.meetOther(this);
}
@Override
- public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(
- BindingSet bindings) throws QueryEvaluationException {
- // TODO Auto-generated method stub
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate( final BindingSet bindings) throws QueryEvaluationException {
+ // Intentionally does nothing.
return null;
}
@@ -88,23 +78,4 @@ public class SimpleExternalTupleSet extends ExternalTupleSet {
.getElements()).replaceAll("\\s+", " ");
}
-
- @Override
- public boolean equals(Object other) {
-
- if (!(other instanceof SimpleExternalTupleSet)) {
- return false;
- } else {
-
- final SimpleExternalTupleSet arg = (SimpleExternalTupleSet) other;
- if (this.getTupleExpr().equals(arg.getTupleExpr())) {
- return true;
- } else {
- return false;
- }
-
- }
-
- }
-
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java b/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java
index 046bd53..8ce89bf 100644
--- a/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java
+++ b/extras/indexing/src/main/java/mvm/rya/indexing/pcj/matching/PCJOptimizer.java
@@ -83,8 +83,8 @@ import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
*
*/
public class PCJOptimizer implements QueryOptimizer, Configurable {
-
private static final Logger log = Logger.getLogger(PCJOptimizer.class);
+
private List<ExternalTupleSet> indexSet;
private Configuration conf;
private boolean init = false;
@@ -104,7 +104,7 @@ public class PCJOptimizer implements QueryOptimizer, Configurable {
} catch (MalformedQueryException | SailException
| QueryEvaluationException | TableNotFoundException
| AccumuloException | AccumuloSecurityException | PcjException e) {
- e.printStackTrace();
+ log.error(e.getMessage(), e);
}
init = true;
}
@@ -352,7 +352,7 @@ public class PCJOptimizer implements QueryOptimizer, Configurable {
//use table name sparql map (indexTables) to create {@link AccumuloIndexSet}
final List<ExternalTupleSet> index = Lists.newArrayList();
if (indexTables.isEmpty()) {
- System.out.println("No Index found");
+ log.info("No Index found");
} else {
for (final String table : indexTables.keySet()) {
final String indexSparqlString = indexTables.get(table);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
index f23f1c4..1f98f88 100644
--- a/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
+++ b/extras/indexing/src/test/java/mvm/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package mvm.rya.api.client.accumulo;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSetTest.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSetTest.java b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSetTest.java
new file mode 100644
index 0000000..6354490
--- /dev/null
+++ b/extras/indexing/src/test/java/mvm/rya/indexing/external/tupleSet/SimpleExternalTupleSetTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package mvm.rya.indexing.external.tupleSet;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+/**
+ * Tests {@link SimpleExternalTupleSet}.
+ */
+public class SimpleExternalTupleSetTest {
+
+ @Test
+ public void equals_equals() throws MalformedQueryException {
+ // The common PCJ expression.
+ final String sparql =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:associatesWith> ?d . " +
+ "}";
+
+ final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
+ final Projection pcjExpression = (Projection) query.getTupleExpr();
+
+ // Create two SimpleExternalTupleSet pbjects using the same expression.
+ final SimpleExternalTupleSet testSet = new SimpleExternalTupleSet(pcjExpression);
+ final SimpleExternalTupleSet identicalTestSet = new SimpleExternalTupleSet(pcjExpression);
+
+ // Show that they are equal.
+ assertEquals(testSet, identicalTestSet);
+ }
+
+ @Test
+ public void equals_notEquals() throws MalformedQueryException {
+ // Create the first SimpleExternalTupleSet object.
+ final String sparql1 =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:associatesWith> ?d . " +
+ "}";
+
+ final ParsedQuery query1 = new SPARQLParser().parseQuery(sparql1, null);
+ final Projection pcjExpression1 = (Projection) query1.getTupleExpr();
+ final SimpleExternalTupleSet set1 = new SimpleExternalTupleSet(pcjExpression1);
+
+ // Create another one using a different expression.
+ final String sparql2 =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:friendsWith> ?d . " +
+ "}";
+
+ final ParsedQuery query2 = new SPARQLParser().parseQuery(sparql2, null);
+ final Projection pcjExpression2 = (Projection) query2.getTupleExpr();
+ final SimpleExternalTupleSet set2 = new SimpleExternalTupleSet(pcjExpression2);
+
+ // Show they are not equal.
+ assertNotEquals(set1, set2);
+ }
+
+ @Test
+ public void hashCode_same() throws MalformedQueryException {
+ // The common PCJ expression.
+ final String sparql =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:associatesWith> ?d . " +
+ "}";
+
+ final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
+ final Projection pcjExpression = (Projection) query.getTupleExpr();
+
+ // Create two SimpleExternalTupleSet pbjects using the same expression.
+ final SimpleExternalTupleSet testSet = new SimpleExternalTupleSet(pcjExpression);
+ final SimpleExternalTupleSet identicalTestSet = new SimpleExternalTupleSet(pcjExpression);
+
+ // Show that they are equal.
+ assertEquals(testSet.hashCode(), identicalTestSet.hashCode());
+ }
+
+ public void hashCode_notSame() throws MalformedQueryException {
+ // Create the first SimpleExternalTupleSet object.
+ final String sparql1 =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:associatesWith> ?d . " +
+ "}";
+
+ final ParsedQuery query1 = new SPARQLParser().parseQuery(sparql1, null);
+ final Projection pcjExpression1 = (Projection) query1.getTupleExpr();
+ final SimpleExternalTupleSet set1 = new SimpleExternalTupleSet(pcjExpression1);
+
+ // Create another one using a different expression.
+ final String sparql2 =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:friendsWith> ?d . " +
+ "}";
+
+ final ParsedQuery query2 = new SPARQLParser().parseQuery(sparql2, null);
+ final Projection pcjExpression2 = (Projection) query2.getTupleExpr();
+ final SimpleExternalTupleSet set2 = new SimpleExternalTupleSet(pcjExpression2);
+
+ // Show they are not equal.
+ assertNotEquals(set1.hashCode(), set2.hashCode());
+ }
+
+ @Test
+ public void getSupportedVariableOrderMap() throws MalformedQueryException {
+ // Create the PCJ expression.
+ final String sparql =
+ "SELECT ?f ?m ?d { " +
+ "?f <urn:talksTo> ?m . " +
+ "?m <uri:associatesWith> ?d . " +
+ "}";
+
+ final ParsedQuery query = new SPARQLParser().parseQuery(sparql, null);
+ final Projection pcjExpression = (Projection) query.getTupleExpr();
+
+ // Create the object that is being tested.
+ final SimpleExternalTupleSet testSet = new SimpleExternalTupleSet(pcjExpression);
+
+ // Verify the correct Supported Variable Order Map is created.
+ final Map<String, Set<String>> expected = new HashMap<>();
+
+ String varOrder = "f";
+ Set<String> vars = new HashSet<>();
+ vars.add("f");
+ expected.put(varOrder, vars);
+
+ varOrder = "f;m";
+ vars = new HashSet<>();
+ vars.add("f");
+ vars.add("m");
+ expected.put(varOrder, vars);
+
+ varOrder = "f;m;d";
+ vars = new HashSet<>();
+ vars.add("f");
+ vars.add("m");
+ vars.add("d");
+ expected.put(varOrder, vars);
+
+ assertEquals(expected, testSet.getSupportedVariableOrderMap());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/3e17a258/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
----------------------------------------------------------------------
diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
new file mode 100644
index 0000000..ff6285b
--- /dev/null
+++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java
@@ -0,0 +1,421 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.query;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.CommandLineOptionException;
+import org.openjdk.jmh.runner.options.CommandLineOptions;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+import mvm.rya.indexing.external.tupleSet.ExternalTupleSet;
+import mvm.rya.indexing.external.tupleSet.SimpleExternalTupleSet;
+import mvm.rya.indexing.pcj.matching.PCJOptimizer;
+
+/**
+ * A benchmark that may be used to evaluate the performance of {@link PCJOptimizer}.
+ * It pivots over three dimensions:
+ * <ul>
+ * <li>How many Statement Patterns the optimized query has.</li>
+ * <li>How many PCJ indices the optimizer has available to it.</li>
+ * <li>How many Statement Patterns each PCJ has.</li>
+ * </ul>
+ * To execute this benchmark, build the project by executing:
+ * <pre>
+ * mvn clean install
+ * </pre>
+ * Transport the "target/benchmarking.jar" file to the system that will execute
+ * the benchmark, write the configuration file, and then execute:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.PCJOptimizerBenchmark
+ * </pre>
+ */
+@State(Scope.Thread)
+@ParametersAreNonnullByDefault
+public class PCJOptimizerBenchmark {
+
+ /**
+ * Variables that may be used when building SPARQL queries.
+ */
+ private static final List<String> variables = Lists.newArrayList("?a","?b",
+ "?c","?d","?e","?f","?g","?h","?i","?j","?k","?l","?m","?n","?o",
+ "?p","?q","?r","?s","?t","?u","?v","?w","?x","?y","?z");
+
+ // Parameters that effect which PCJs are used by the benchmark.
+ @Param({"0", "1", "2", "3", "4", "5", "6"})
+ public int numPCJs;
+
+ @Param({"2", "3", "4", "5", "6"})
+ public int pcjSPCount;
+
+ // Parameters that effect the Query that is being optimized by the benchmark.
+ @Param({"1", "2", "3", "4", "5", "6"})
+ public int querySPCount;
+
+ // Cached benchmark data that is generated during the setup phase.
+ private final Map<BenchmarkParams, BenchmarkValues> chainedBenchmarkValues = new HashMap<>();
+ private final Map<BenchmarkParams, BenchmarkValues> unchainedBenchmarkValues = new HashMap<>();
+
+ @Setup
+ public void buildBenchmarkValues() throws MalformedQueryException {
+ for(int numPCJs = 0; numPCJs <= 6; numPCJs++) {
+ for(int pcjSPCount = 2; pcjSPCount <= 6; pcjSPCount++) {
+ for(int querySPCount = 1; querySPCount <= 6; querySPCount++) {
+ final BenchmarkParams benchmarkParams = new BenchmarkParams(numPCJs, pcjSPCount, querySPCount);
+
+ final BenchmarkValues chainedValues = new BenchmarkValues(
+ makeChainedQuery(benchmarkParams),
+ makeChainedPCJOptimizer(benchmarkParams));
+ this.chainedBenchmarkValues.put(benchmarkParams, chainedValues);
+
+ final BenchmarkValues unchainedValues = new BenchmarkValues(
+ makeUnchainedQuery(benchmarkParams),
+ makeUnchainedPCJOptimizer(benchmarkParams));
+ this.unchainedBenchmarkValues.put(benchmarkParams, unchainedValues);
+ }
+ }
+ }
+ }
+
+ @Benchmark
+ public void optimizeQuery_unchained() throws MalformedQueryException {
+ // Fetch the pieces that benchmark uses.
+ final BenchmarkValues values = unchainedBenchmarkValues.get( new BenchmarkParams(numPCJs, pcjSPCount, querySPCount) );
+ final PCJOptimizer pcjOptimizer = values.getPCJOptimizer();
+ final TupleExpr query = values.getQuery();
+
+ // Perform the optimization.
+ pcjOptimizer.optimize(query, null, null);
+ }
+
+ @Benchmark
+ public void optimizeQuery_chained() throws MalformedQueryException {
+ // Fetch the pieces that benchmark uses.
+ final BenchmarkValues values = chainedBenchmarkValues.get( new BenchmarkParams(numPCJs, pcjSPCount, querySPCount) );
+ final PCJOptimizer pcjOptimizer = values.getPCJOptimizer();
+ final TupleExpr query = values.getQuery();
+
+ // Perform the optimization.
+ pcjOptimizer.optimize(query, null, null);
+ }
+
+ private static TupleExpr makeUnchainedQuery(final BenchmarkParams params) throws MalformedQueryException {
+ final Queue<String> varQueue= Lists.newLinkedList(variables);
+ final SPARQLParser parser = new SPARQLParser();
+
+ final List<String> queryVars = new ArrayList<>();
+
+ // The first statement pattern has two variables.
+ queryVars.add( varQueue.remove() );
+ queryVars.add( varQueue.remove() );
+
+ // The each extra statement pattern joins with the previous one, so only need one more variable each.
+ for(int i = 1; i < params.getQuerySPCount(); i++) {
+ queryVars.add( varQueue.remove() );
+ queryVars.add( varQueue.remove() );
+ }
+
+ final String sparql = buildUnchainedSPARQL(queryVars);
+ return parser.parseQuery(sparql, null).getTupleExpr();
+ }
+
+ private static TupleExpr makeChainedQuery(final BenchmarkParams params) throws MalformedQueryException {
+ final Queue<String> varQueue= Lists.newLinkedList(variables);
+ final SPARQLParser parser = new SPARQLParser();
+
+ final List<String> queryVars = new ArrayList<>();
+
+ // The first statement pattern has two variables.
+ queryVars.add( varQueue.remove() );
+ queryVars.add( varQueue.remove() );
+
+ // The each extra statement pattern joins with the previous one, so only need one more variable each.
+ for(int i = 1; i < params.getQuerySPCount(); i++) {
+ queryVars.add( varQueue.remove() );
+ }
+
+ final String sparql = buildChainedSPARQL(queryVars);
+ return parser.parseQuery(sparql, null).getTupleExpr();
+ }
+
+ private static PCJOptimizer makeUnchainedPCJOptimizer(final BenchmarkParams params) throws MalformedQueryException {
+ final Queue<String> varQueue= Lists.newLinkedList(variables);
+ final SPARQLParser parser = new SPARQLParser();
+
+ final List<ExternalTupleSet> indices = new ArrayList<>();
+
+ // Create the first PCJ.
+ final List<String> pcjVars = new ArrayList<>();
+ pcjVars.add( varQueue.remove() );
+ pcjVars.add( varQueue.remove() );
+
+ for(int spI = 1; spI < params.getPCJSPCount(); spI++) {
+ pcjVars.add( varQueue.remove() );
+ pcjVars.add( varQueue.remove() );
+ }
+
+ String pcjSparql = buildUnchainedSPARQL(pcjVars);
+ Projection projection = (Projection) parser.parseQuery(pcjSparql, null).getTupleExpr();
+ indices.add( new SimpleExternalTupleSet(projection) );
+
+ // Add the rest of the PCJs.
+ for(int pcjI = 1; pcjI < params.getNumPCJS(); pcjI++) {
+ // Remove the previous PCJs first variable.
+ pcjVars.remove(0);
+ pcjVars.remove(0);
+
+ // And add a new one to the end of it.
+ pcjVars.add( varQueue.remove() );
+ pcjVars.add( varQueue.remove() );
+
+ // Build the index.
+ pcjSparql = buildUnchainedSPARQL(pcjVars);
+ projection = (Projection) parser.parseQuery(pcjSparql, null).getTupleExpr();
+ indices.add( new SimpleExternalTupleSet(projection) );
+ }
+
+ // Create the optimizer.
+ return new PCJOptimizer(indices, false);
+ }
+
+ private static PCJOptimizer makeChainedPCJOptimizer(final BenchmarkParams params) throws MalformedQueryException {
+ final Queue<String> varQueue= Lists.newLinkedList(variables);
+ final SPARQLParser parser = new SPARQLParser();
+
+ final List<ExternalTupleSet> indices = new ArrayList<>();
+
+ // Create the first PCJ.
+ final List<String> pcjVars = new ArrayList<>();
+ pcjVars.add( varQueue.remove() );
+ pcjVars.add( varQueue.remove() );
+
+ for(int spI = 1; spI < params.getPCJSPCount(); spI++) {
+ pcjVars.add( varQueue.remove() );
+ }
+
+ String pcjSparql = buildChainedSPARQL(pcjVars);
+ Projection projection = (Projection) parser.parseQuery(pcjSparql, null).getTupleExpr();
+ indices.add( new SimpleExternalTupleSet(projection) );
+
+ // Add the rest of the PCJs.
+ for(int pcjI = 1; pcjI < params.getNumPCJS(); pcjI++) {
+ // Remove the previous PCJs first variable.
+ pcjVars.remove(0);
+
+ // And add a new one to the end of it.
+ pcjVars.add( varQueue.remove() );
+
+ // Build the index.
+ pcjSparql = buildChainedSPARQL(pcjVars);
+ projection = (Projection) parser.parseQuery(pcjSparql, null).getTupleExpr();
+ indices.add( new SimpleExternalTupleSet(projection) );
+ }
+
+ // Create the optimizer.
+ return new PCJOptimizer(indices, false);
+ }
+
+ private static String buildUnchainedSPARQL(final List<String> vars) {
+ checkArgument(vars.size() % 2 == 0);
+
+ final Queue<String> varQueue= Lists.newLinkedList(vars);
+ final List<String> statementPatterns = new ArrayList<>();
+
+ // Create the first SP.
+ String var1 = varQueue.remove();
+ String var2 = varQueue.remove();
+ statementPatterns.add( var1 + " <urn:predicate> " + var2);
+
+ // Need two more variables for every following statement pattern.
+ while(!varQueue.isEmpty()) {
+ var1 = varQueue.remove();
+ var2 = varQueue.remove();
+ statementPatterns.add( var1 + " <urn:predicate> " + var2);
+ }
+
+ return "select " + Joiner.on(" ").join(vars) + " where { " +
+ Joiner.on(" . ").join(statementPatterns) +
+ " . }" ;
+ }
+
+ private static String buildChainedSPARQL(final List<String> vars) {
+ final Queue<String> varQueue= Lists.newLinkedList(vars);
+ final List<String> statementPatterns = new ArrayList<>();
+
+ // Create the first SP.
+ final String var1 = varQueue.remove();
+ final String var2 = varQueue.remove();
+ statementPatterns.add( var1 + " <urn:predicate> " + var2);
+
+ // Chain the rest of the SPs off of each other.
+ String lastVar = var2;
+
+ while(!varQueue.isEmpty()) {
+ final String var = varQueue.remove();
+ statementPatterns.add( lastVar + " <urn:predicate> " + var);
+ lastVar = var;
+ }
+
+ // Build the SPARQL query from the pieces.
+ return "select " + Joiner.on(" ").join(vars) + " where { " +
+ Joiner.on(" . ").join(statementPatterns) +
+ " . }" ;
+ }
+
+ /**
+ * The parameter values used by the benchmark. Used to lookup a benchmark' {@link BenchmarkValues}.
+ */
+ @ParametersAreNonnullByDefault
+ public static class BenchmarkParams {
+ private final int numPCJs;
+ private final int pcjSPCount;
+ private final int querySPCount;
+
+ /**
+ * Constructs an instance of {@link BenchmarkParams}.
+ *
+ * @param numPCJs - The number of PCJs that will be available to the {@link PCJOptimizer}. (not null)
+ * @param pcjSPCount - The number of Statement Patterns that are in each PCJs. (not null)
+ * @param querySPCount - The number of Statement Patterns that are in the query that will be optimized. (not null)
+ */
+ public BenchmarkParams(final int numPCJs, final int pcjSPCount, final int querySPCount){
+ this.numPCJs = numPCJs;
+ this.pcjSPCount = pcjSPCount;
+ this.querySPCount = querySPCount;
+ }
+
+ /**
+ * @return The number of PCJs that will be available to the {@link PCJOptimizer}.
+ */
+ public int getNumPCJS() {
+ return numPCJs;
+ }
+
+ /**
+ * @return The number of Statement Patterns that are in each PCJs.
+ */
+ public int getPCJSPCount() {
+ return pcjSPCount;
+ }
+
+ /**
+ * @return The number of Statement Patterns that are in the query that will be optimized.
+ */
+ public int getQuerySPCount() {
+ return querySPCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(numPCJs, pcjSPCount, querySPCount);
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ if(this == other) {
+ return true;
+ }
+ if(other instanceof BenchmarkParams) {
+ final BenchmarkParams key = (BenchmarkParams) other;
+ return numPCJs == key.numPCJs &&
+ pcjSPCount == key.pcjSPCount &&
+ querySPCount == key.querySPCount;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Holds onto the SPARQL query that will be optimized as well as the optimizers
+ * that will be used to optimize the query.
+ */
+ @ParametersAreNonnullByDefault
+ public static class BenchmarkValues {
+ private final TupleExpr query;
+ private final PCJOptimizer optimizer;
+
+ /**
+ * Constructs an isntance of {@link BenchmarkValues}.
+ *
+ * @param query - The SPARQL query to optimize.
+ * @param optimizer - The optimizer used to optimize the query.
+ */
+ public BenchmarkValues(final TupleExpr query, final PCJOptimizer optimizer) {
+ this.query = requireNonNull(query);
+ this.optimizer = requireNonNull(optimizer);
+ }
+
+ /**
+ * @return The SPARQL query to optimize.
+ */
+ public TupleExpr getQuery() {
+ return query;
+ }
+
+ /**
+ * @return The optimizer used to optimize the query.
+ */
+ public PCJOptimizer getPCJOptimizer() {
+ return optimizer;
+ }
+ }
+
+ /**
+ * Runs the PCJOptimizer benchmarks.
+ * </p>
+ * Example command line:
+ * <pre>
+ * java -cp benchmarks.jar org.apache.rya.benchmark.query.PCJOptimizerBenchmark
+ * </pre>
+ *
+ * @param args - The command line arguments that will be fed into the benchmark.
+ * @throws Exception The benchmark could not be run.
+ */
+ public static void main(final String[] args) throws RunnerException, MalformedQueryException, CommandLineOptionException {
+ final OptionsBuilder opts = new OptionsBuilder();
+ opts.parent( new CommandLineOptions(args) );
+ opts.include(PCJOptimizerBenchmark.class.getSimpleName());
+
+ new Runner(opts.build()).run();
+ }
+}
\ No newline at end of file