You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2018/01/12 21:25:30 UTC
[2/8] incubator-rya git commit: RYA-303 Mongo PCJ Support. Closes
#172.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java
new file mode 100644
index 0000000..f4e4e9e
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.api.instance.RyaDetailsUpdater;
+import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.openrdf.query.BindingSet;
+
+import com.mongodb.MongoClient;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A mongo backed implementation of {@link PrecomputedJoinStorage}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoPcjStorage implements PrecomputedJoinStorage {
+ public static final String PCJ_COLLECTION_NAME = "pcjs";
+ // Used to update the instance's metadata.
+ private final MongoRyaInstanceDetailsRepository ryaDetailsRepo;
+
+ private final String ryaInstanceName;
+
+ // Factories that are used to create new PCJs.
+ private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+
+ private final MongoPcjDocuments pcjDocs;
+
+ /**
+ * Constructs an instance of {@link MongoPcjStorage}.
+ *
+ * @param client - The {@link MongoClient} that will be used to connect to Mongodb. (not null)
+ * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null)
+ */
+ public MongoPcjStorage(final MongoClient client, final String ryaInstanceName) {
+ requireNonNull(client);
+ this.ryaInstanceName = requireNonNull(ryaInstanceName);
+ pcjDocs = new MongoPcjDocuments(client, ryaInstanceName);
+ ryaDetailsRepo = new MongoRyaInstanceDetailsRepository(client, ryaInstanceName);
+ }
+
+ @Override
+ public String createPcj(final String sparql) throws PCJStorageException {
+ requireNonNull(sparql);
+
+ // Update the Rya Details for this instance to include the new PCJ
+ // table.
+ final String pcjId = pcjIdFactory.nextId();
+
+ try {
+ new RyaDetailsUpdater(ryaDetailsRepo).update(originalDetails -> {
+ // Create the new PCJ's details.
+ final PCJDetails.Builder newPcjDetails = PCJDetails.builder().setId(pcjId);
+
+ // Add them to the instance's details.
+ final RyaDetails.Builder mutated = RyaDetails.builder(originalDetails);
+ mutated.getPCJIndexDetails().addPCJDetails(newPcjDetails);
+ return mutated.build();
+ });
+ } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new PCJStorageException(String.format("Could not create a new PCJ for Rya instance '%s' "
+ + "because of a problem while updating the instance's details.", ryaInstanceName), e);
+ }
+
+ // Create the objectID of the document to house the PCJ results.
+ pcjDocs.createPcj(pcjId, sparql);
+
+ // Add access to the PCJ table to all users who are authorized for this
+ // instance of Rya.
+ return pcjId;
+ }
+
+
+ @Override
+ public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException {
+ requireNonNull(pcjId);
+ return pcjDocs.getPcjMetadata(pcjId);
+ }
+
+ @Override
+ public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException {
+ requireNonNull(pcjId);
+ requireNonNull(results);
+ pcjDocs.addResults(pcjId, results);
+ }
+
+
+ @Override
+ public CloseableIterator<BindingSet> listResults(final String pcjId) throws PCJStorageException {
+ requireNonNull(pcjId);
+ // Scan the PCJ table.
+ return pcjDocs.listResults(pcjId);
+ }
+
+ @Override
+ public void purge(final String pcjId) throws PCJStorageException {
+ requireNonNull(pcjId);
+ pcjDocs.purgePcjs(pcjId);
+ }
+
+ @Override
+ public void dropPcj(final String pcjId) throws PCJStorageException {
+ requireNonNull(pcjId);
+
+ // Update the Rya Details for this instance to no longer include the
+ // PCJ.
+ try {
+ new RyaDetailsUpdater(ryaDetailsRepo).update(originalDetails -> {
+ // Drop the PCJ's metadata from the instance's metadata.
+ final RyaDetails.Builder mutated = RyaDetails.builder(originalDetails);
+ mutated.getPCJIndexDetails().removePCJDetails(pcjId);
+ return mutated.build();
+ });
+ } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) {
+ throw new PCJStorageException(String.format("Could not drop an existing PCJ for Rya instance '%s' "
+ + "because of a problem while updating the instance's details.", ryaInstanceName), e);
+ }
+
+ // Delete the table that hold's the PCJ's results.
+ pcjDocs.dropPcj(pcjId);
+ }
+
+ @Override
+ public List<String> listPcjs() throws PCJStorageException {
+ try {
+ final RyaDetails details = ryaDetailsRepo.getRyaInstanceDetails();
+ final PCJIndexDetails pcjIndexDetails = details.getPCJIndexDetails();
+ final List<String> pcjIds = new ArrayList<>(pcjIndexDetails.getPCJDetails().keySet());
+ return pcjIds;
+ } catch (final RyaDetailsRepositoryException e) {
+ throw new PCJStorageException("Could not check to see if RyaDetails exist for the instance.", e);
+ }
+ }
+
+ @Override
+ public void close() throws PCJStorageException {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java
new file mode 100644
index 0000000..f522fac
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.mongodb.MongoITBase;
+import org.bson.Document;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class MongoPcjDocumentsTest extends MongoITBase {
+ @Test
+ public void pcjToMetadata() throws Exception {
+ final MongoPcjDocuments docConverter = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final Document actual = docConverter.makeMetadataDocument("pcjTest", sparql);
+ final Document expected = new Document()
+ .append(MongoPcjDocuments.CARDINALITY_FIELD, 0)
+ .append(MongoPcjDocuments.PCJ_METADATA_ID, "pcjTest_METADATA")
+ .append(MongoPcjDocuments.SPARQL_FIELD, sparql)
+ .append(MongoPcjDocuments.VAR_ORDER_FIELD, Sets.newHashSet(new VariableOrder("a", "b"), new VariableOrder("b", "a")));
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void metadataExists() throws Exception {
+ final List<VariableOrder> varOrders = Lists.newArrayList(new VariableOrder("b", "a"), new VariableOrder("a", "b"));
+ final MongoPcjDocuments docConverter = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ docConverter.createPcj("pcjTest", sparql);
+
+ PcjMetadata actual = docConverter.getPcjMetadata("pcjTest");
+ PcjMetadata expected = new PcjMetadata(sparql, 0, varOrders);
+ assertEquals(expected, actual);
+
+ // Setup the binding set that will be converted.
+ final MapBindingSet originalBindingSet1 = new MapBindingSet();
+ originalBindingSet1.addBinding("x", new URIImpl("http://a"));
+ originalBindingSet1.addBinding("y", new URIImpl("http://b"));
+ originalBindingSet1.addBinding("z", new URIImpl("http://c"));
+ final VisibilityBindingSet results1 = new VisibilityBindingSet(originalBindingSet1, "A&B&C");
+
+ // Setup the binding set that will be converted.
+ final MapBindingSet originalBindingSet2 = new MapBindingSet();
+ originalBindingSet2.addBinding("x", new URIImpl("http://1"));
+ originalBindingSet2.addBinding("y", new URIImpl("http://2"));
+ originalBindingSet2.addBinding("z", new URIImpl("http://3"));
+ final VisibilityBindingSet results2 = new VisibilityBindingSet(originalBindingSet2, "A&B&C");
+
+ final List<VisibilityBindingSet> bindingSets = new ArrayList<>();
+ bindingSets.add(results1);
+ bindingSets.add(results2);
+
+ docConverter.addResults("pcjTest", bindingSets);
+ actual = docConverter.getPcjMetadata("pcjTest");
+ expected = new PcjMetadata(sparql, 2, varOrders);
+ assertEquals(expected, actual);
+
+ docConverter.purgePcjs("pcjTest");
+ actual = docConverter.getPcjMetadata("pcjTest");
+ expected = new PcjMetadata(sparql, 0, varOrders);
+ assertEquals(expected, actual);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java
new file mode 100644
index 0000000..6747558
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java
@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.FreeTextIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.JoinSelectivityDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
+import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.mongodb.MongoITBase;
+import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Integration tests the methods of {@link AccumuloPcjStorage}.
+ * </p>
+ * These tests ensures that the PCJ tables are maintained and that these operations
+ * also update the Rya instance's details.
+ */
+public class MongoPcjStorageIT extends MongoITBase {
+
+ @Test
+ public void createPCJ() throws Exception {
+ // Setup the PCJ storage that will be tested against.
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ // Create a PCJ.
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+
+ // Ensure the Rya details have been updated to include the PCJ's ID.
+ final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
+ .getPCJIndexDetails()
+ .getPCJDetails();
+
+ final PCJDetails expectedDetails = PCJDetails.builder()
+ .setId( pcjId )
+ .build();
+
+ assertEquals(expectedDetails, detailsMap.get(pcjId));
+ }
+ }
+
+
+ @Test
+ public void dropPCJ() throws Exception {
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ // Create a PCJ.
+ final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+
+ // Delete the PCJ that was just created.
+ pcjStorage.dropPcj(pcjId);
+
+ // Ensure the Rya details have been updated to no longer include the PCJ's ID.
+
+ final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
+ .getPCJIndexDetails()
+ .getPCJDetails();
+
+ assertFalse( detailsMap.containsKey(pcjId) );
+ }
+ }
+
+ @Test
+ public void listPcjs() throws Exception {
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ // Create a few PCJs and hold onto their IDs.
+ final List<String> expectedIds = new ArrayList<>();
+
+ String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+ expectedIds.add( pcjId );
+
+ pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+ expectedIds.add( pcjId );
+
+ pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+ expectedIds.add( pcjId );
+
+ // Fetch the PCJ names
+ final List<String> pcjIds = pcjStorage.listPcjs();
+
+ // Ensure the expected IDs match the fetched IDs.
+ Collections.sort(expectedIds);
+ Collections.sort(pcjIds);
+ assertEquals(expectedIds, pcjIds);
+ }
+ }
+
+ @Test
+ public void getPcjMetadata() throws Exception {
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Fetch the PCJ's metadata.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+ // Ensure it has the expected values.
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+ assertEquals(expectedMetadata, metadata);
+ }
+ }
+
+ @Test
+ public void addResults() throws Exception {
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Add some binding sets to it.
+ final Set<VisibilityBindingSet> results = new HashSet<>();
+
+ final MapBindingSet aliceBS = new MapBindingSet();
+ aliceBS.addBinding("a", new URIImpl("http://Alice"));
+ aliceBS.addBinding("b", new URIImpl("http://Person"));
+ results.add( new VisibilityBindingSet(aliceBS, "") );
+
+ final MapBindingSet charlieBS = new MapBindingSet();
+ charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+ charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+ results.add( new VisibilityBindingSet(charlieBS, "") );
+
+ pcjStorage.addResults(pcjId, results);
+
+ // Make sure the PCJ metadata was updated.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders);
+ assertEquals(expectedMetadata, metadata);
+ }
+ }
+
+ @Test
+ public void listResults() throws Exception {
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Add some binding sets to it.
+ final Set<VisibilityBindingSet> visiSets = new HashSet<>();
+ final Set<BindingSet> expectedResults = new HashSet<>();
+
+ final MapBindingSet aliceBS = new MapBindingSet();
+ aliceBS.addBinding("a", new URIImpl("http://Alice"));
+ aliceBS.addBinding("b", new URIImpl("http://Person"));
+ visiSets.add( new VisibilityBindingSet(aliceBS, "") );
+ expectedResults.add(aliceBS);
+
+ final MapBindingSet charlieBS = new MapBindingSet();
+ charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+ charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+ visiSets.add( new VisibilityBindingSet(charlieBS, "") );
+ expectedResults.add(charlieBS);
+
+ pcjStorage.addResults(pcjId, visiSets);
+
+ // List the results that were stored.
+ final Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+
+ assertEquals(expectedResults, results);
+ }
+ }
+
+ @Test
+ public void purge() throws Exception {
+ try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) {
+ final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName());
+ detailsRepo.initialize(
+ RyaDetails.builder()
+ .setRyaInstanceName(conf.getRyaInstanceName())
+ .setRyaVersion("test")
+ .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false))
+ .setTemporalIndexDetails(new TemporalIndexDetails(false))
+ .setFreeTextDetails(new FreeTextIndexDetails(false))
+ .setProspectorDetails(new ProspectorDetails(Optional.absent()))
+ .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent()))
+ .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true))
+ .build()
+ );
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Add some binding sets to it.
+ final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+ final MapBindingSet aliceBS = new MapBindingSet();
+ aliceBS.addBinding("a", new URIImpl("http://Alice"));
+ aliceBS.addBinding("b", new URIImpl("http://Person"));
+ expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+ final MapBindingSet charlieBS = new MapBindingSet();
+ charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+ charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+ expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+ pcjStorage.addResults(pcjId, expectedResults);
+
+ // Purge the PCJ.
+ pcjStorage.purge(pcjId);
+
+ // List the results that were stored.
+ final Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+
+ assertTrue( results.isEmpty() );
+
+ // Make sure the PCJ metadata was updated.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+ assertEquals(expectedMetadata, metadata);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java
new file mode 100644
index 0000000..0c71c9f
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.utils.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoDBRyaDAO;
+import org.apache.rya.mongodb.MongoITBase;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.NumericLiteralImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+import org.openrdf.repository.RepositoryConnection;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+/**
+ * Performs integration test using {@link MiniAccumuloCluster} to ensure the
+ * functions of {@link PcjTables} work within a cluster setting.
+ */
+public class PcjDocumentsIntegrationTest extends MongoITBase {
+ @Override
+ protected void updateConfiguration(final MongoDBRdfConfiguration conf) {
+ conf.setDisplayQueryPlan(true);
+ }
+
+ /**
+ * Ensure that when a new PCJ table is created, it is initialized with the
+ * correct metadata values.
+ * <p>
+ * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)}
+ */
+ @Test
+ public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = "testPcj";
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createPcj(pcjTableName, sparql);
+
+ // Fetch the PcjMetadata and ensure it has the correct values.
+ final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(pcjTableName);
+
+ // Ensure the metadata matches the expected value.
+ final PcjMetadata expected = new PcjMetadata(sparql, 0L, Sets.newHashSet(new VariableOrder("name", "age"), new VariableOrder("age", "name")));
+ assertEquals(expected, pcjMetadata);
+ }
+
+ /**
+ * Ensure when results have been written to the PCJ table that they are in Accumulo.
+ * <p>
+ * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)}
+ */
+ @Test
+ public void addResults() throws Exception {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = "testPcj";
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createPcj(pcjTableName, sparql);
+
+ // Add a few results to the PCJ table.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+ pcjs.addResults(pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
+ new VisibilityBindingSet(alice),
+ new VisibilityBindingSet(bob),
+ new VisibilityBindingSet(charlie)));
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Scan Accumulo for the stored results.
+ final Collection<BindingSet> fetchedResults = loadPcjResults(pcjTableName);
+ assertEquals(expected, fetchedResults);
+ }
+
+ @Test
+ public void listResults() throws Exception {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = "testPcj";
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createPcj(pcjTableName, sparql);
+
+ // Add a few results to the PCJ table.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ pcjs.addResults(pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
+ new VisibilityBindingSet(alice),
+ new VisibilityBindingSet(bob),
+ new VisibilityBindingSet(charlie)));
+
+ // Fetch the Binding Sets that have been stored in the PCJ table.
+ final Set<BindingSet> results = new HashSet<>();
+
+ final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(pcjTableName);
+ try {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ } finally {
+ resultsIt.close();
+ }
+
+ // Verify the fetched results match the expected ones.
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+ assertEquals(expected, results);
+ }
+
+ /**
+ * Ensure when results are already stored in Rya, that we are able to populate
+ * the PCJ table for a new SPARQL query using those results.
+ * <p>
+ * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)}
+ */
+ @Test
+ public void populatePcj() throws Exception {
+ final MongoDBRyaDAO dao = new MongoDBRyaDAO();
+ dao.setConf(new StatefulMongoDBRdfConfiguration(conf, getMongoClient()));
+ dao.init();
+ final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+ ryaStore.setRyaDAO(dao);
+ ryaStore.initialize();
+ final SailRepositoryConnection ryaConn = new RyaSailRepository(ryaStore).getConnection();
+ ryaConn.begin();
+
+ try {
+ // Load some Triples into Rya.
+ final Set<Statement> triples = new HashSet<>();
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+
+ for(final Statement triple : triples) {
+ ryaConn.add(triple);
+ }
+
+ // Create a PCJ table that will include those triples in its results.
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = "testPcj";
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createPcj(pcjTableName, sparql);
+
+ // Populate the PCJ table using a Rya connection.
+ pcjs.populatePcj(pcjTableName, ryaConn);
+
+ final Collection<BindingSet> fetchedResults = loadPcjResults(pcjTableName);
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Ensure the expected results match those that were stored.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+
+ assertEquals(expected, fetchedResults);
+ } finally {
+ ryaConn.close();
+ ryaStore.shutDown();
+ }
+ }
+
+ /**
+ * Ensure the method that creates a new PCJ table, scans Rya for matches, and
+ * stores them in the PCJ table works.
+ * <p>
+ * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)}
+ */
+ @Test
+ public void createAndPopulatePcj() throws Exception {
+ final MongoDBRyaDAO dao = new MongoDBRyaDAO();
+ dao.setConf(new StatefulMongoDBRdfConfiguration(conf, getMongoClient()));
+ dao.init();
+ final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+ ryaStore.setRyaDAO(dao);
+ ryaStore.initialize();
+ final SailRepositoryConnection ryaConn = new RyaSailRepository(ryaStore).getConnection();
+ ryaConn.begin();
+
+ try {
+ // Load some Triples into Rya.
+ final Set<Statement> triples = new HashSet<>();
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+
+ for(final Statement triple : triples) {
+ ryaConn.add(triple);
+ }
+
+ // Create a PCJ table that will include those triples in its results.
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = "testPcj";
+
+ // Create and populate the PCJ table.
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createAndPopulatePcj(ryaConn, pcjTableName, sparql);
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Scan Accumulo for the stored results.
+ final Collection<BindingSet> fetchedResults = loadPcjResults(pcjTableName);
+
+ // Ensure the expected results match those that were stored.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie);
+
+ assertEquals(expected, fetchedResults);
+ } finally {
+ ryaConn.close();
+ ryaStore.shutDown();
+ }
+ }
+
+ @Test
+ public void listPcjs() throws Exception {
+ // Set up the table names that will be used.
+ final String instance1 = "instance1_";
+ final String instance2 = "instance2_";
+
+ final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1");
+ final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2");
+ final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3");
+
+ final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1");
+
+ // Create the PCJ Tables that are in instance 1 and instance 2.
+ final String sparql = "SELECT ?x WHERE { ?x <http://isA> <http://Food> }";
+
+ final MongoPcjDocuments pcjs1 = new MongoPcjDocuments(getMongoClient(), instance1);
+ final MongoPcjDocuments pcjs2 = new MongoPcjDocuments(getMongoClient(), instance2);
+ pcjs1.createPcj(instance1_table1, sparql);
+ pcjs1.createPcj(instance1_table2, sparql);
+ pcjs1.createPcj(instance1_table3, sparql);
+
+ pcjs2.createPcj(instance2_table1, sparql);
+
+ // Ensure all of the names have been stored for instance 1 and 2.
+ final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3);
+ final Set<String> instance1Tables = Sets.newHashSet( pcjs1.listPcjDocuments() );
+ assertEquals(expected1, instance1Tables);
+
+ final Set<String> expected2 = Sets.newHashSet(instance2_table1);
+ final Set<String> instance2Tables = Sets.newHashSet( pcjs2.listPcjDocuments() );
+ assertEquals(expected2, instance2Tables);
+ }
+
+ @Test
+ public void purge() throws Exception {
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "FILTER(?age < 30) ." +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = "testPcj";
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createPcj(pcjTableName, sparql);
+
+ // Add a few results to the PCJ table.
+ final MapBindingSet alice = new MapBindingSet();
+ alice.addBinding("name", new URIImpl("http://Alice"));
+ alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER));
+
+ final MapBindingSet bob = new MapBindingSet();
+ bob.addBinding("name", new URIImpl("http://Bob"));
+ bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER));
+
+ final MapBindingSet charlie = new MapBindingSet();
+ charlie.addBinding("name", new URIImpl("http://Charlie"));
+ charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER));
+
+ pcjs.addResults(pcjTableName, Sets.<VisibilityBindingSet>newHashSet(
+ new VisibilityBindingSet(alice),
+ new VisibilityBindingSet(bob),
+ new VisibilityBindingSet(charlie)));
+
+ // Make sure the cardinality was updated.
+ PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(3, metadata.getCardinality());
+
+ // Purge the data.
+ pcjs.purgePcjs(pcjTableName);
+
+ // Make sure the cardinality was updated to 0.
+ metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(0, metadata.getCardinality());
+ }
+
+ @Test(expected=PCJStorageException.class)
+ public void dropPcj() throws Exception {
+ // Create a PCJ index.
+ final String pcjTableName = "testPcj";
+ final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>";
+
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createPcj(pcjTableName, sparql);
+
+ // Fetch its metadata to show that it has actually been created.
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, new ArrayList<VariableOrder>());
+ PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(expectedMetadata, metadata);
+
+ // Drop it.
+ pcjs.dropPcj(pcjTableName);
+
+ // Show the metadata is no longer present.
+ metadata = pcjs.getPcjMetadata(pcjTableName);
+ }
+
+ private Collection<BindingSet> loadPcjResults(final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException {
+
+ // Get the variable orders the data was written to.
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ final CloseableIterator<BindingSet> bindings = pcjs.listResults(pcjTableName);
+ final Set<BindingSet> bindingSets = new HashSet<>();
+ while(bindings.hasNext()) {
+ bindingSets.add(bindings.next());
+ }
+ return bindingSets;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java
new file mode 100644
index 0000000..a3ba747
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.rya.indexing.pcj.storage.mongo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoDBRyaDAO;
+import org.apache.rya.mongodb.MongoITBase;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
+import org.apache.rya.rdftriplestore.RyaSailRepository;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.NumericLiteralImpl;
+import org.openrdf.model.impl.StatementImpl;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+
+public class PcjDocumentsWithMockTest extends MongoITBase {
+ @Override
+ protected void updateConfiguration(final MongoDBRdfConfiguration conf) {
+ conf.setDisplayQueryPlan(false);
+ }
+
+ @Test
+ public void populatePcj() throws Exception {
+ final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
+ final MongoDBRyaDAO dao = new MongoDBRyaDAO();
+ dao.setConf(new StatefulMongoDBRdfConfiguration(conf, getMongoClient()));
+ dao.init();
+ ryaStore.setRyaDAO(dao);
+ ryaStore.initialize();
+ final SailRepositoryConnection ryaConn = new RyaSailRepository(ryaStore).getConnection();
+
+ try {
+ // Load some Triples into Rya.
+ final Set<Statement> triples = new HashSet<>();
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) );
+ triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) );
+
+ for(final Statement triple : triples) {
+ ryaConn.add(triple);
+ }
+
+ // Create a PCJ table that will include those triples in its results.
+ final String sparql =
+ "SELECT ?name ?age " +
+ "{" +
+ "?name <http://hasAge> ?age." +
+ "?name <http://playsSport> \"Soccer\" " +
+ "}";
+
+ final String pcjTableName = new PcjTableNameFactory().makeTableName(conf.getRyaInstanceName(), "testPcj");
+ final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName());
+ pcjs.createAndPopulatePcj(ryaConn, pcjTableName, sparql);
+
+ // Make sure the cardinality was updated.
+ final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName);
+ assertEquals(4, metadata.getCardinality());
+ } finally {
+ ryaConn.close();
+ ryaStore.shutDown();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
index 7d6b241..ef5ab34 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java
@@ -132,7 +132,7 @@ public class CreateDeleteIT extends RyaExportITBase {
// Register the PCJ with Rya.
final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector());
- final String pcjId = ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet());
// Write the data to Rya.
final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
index 610f502..11415eb 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java
@@ -556,7 +556,7 @@ public class QueryIT extends RyaExportITBase {
final Set<BindingSet> expectedResults = new HashSet<>();
final long period = 1800000;
- final long binId = (currentTime / period) * period;
+ final long binId = currentTime / period * period;
MapBindingSet bs = new MapBindingSet();
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
@@ -657,7 +657,7 @@ public class QueryIT extends RyaExportITBase {
final Set<BindingSet> expectedResults = new HashSet<>();
final long period = 1800000;
- final long binId = (currentTime / period) * period;
+ final long binId = currentTime / period * period;
MapBindingSet bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("4", XMLSchema.INTEGER));
@@ -734,7 +734,7 @@ public class QueryIT extends RyaExportITBase {
final Set<BindingSet> expectedResults = new HashSet<>();
final long period = 1800000;
- final long binId = (currentTime / period) * period;
+ final long binId = currentTime / period * period;
MapBindingSet bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
@@ -854,7 +854,7 @@ public class QueryIT extends RyaExportITBase {
final Set<BindingSet> expectedResults = new HashSet<>();
final long period = 1800000;
- final long binId = (currentTime / period) * period;
+ final long binId = currentTime / period * period;
MapBindingSet bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
@@ -935,7 +935,7 @@ public class QueryIT extends RyaExportITBase {
final Set<BindingSet> expectedResults = new HashSet<>();
final long period = 1800000;
- final long binId = (currentTime / period) * period;
+ final long binId = currentTime / period * period;
MapBindingSet bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
@@ -996,7 +996,7 @@ public class QueryIT extends RyaExportITBase {
switch (strategy) {
case RYA:
- ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql);
+ ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
addStatementsAndWait(statements);
// Fetch the value that is stored within the PCJ table.
try (final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
index 90ed01a..45be971 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java
@@ -104,7 +104,7 @@ public class PcjVisibilityIT extends RyaExportITBase {
final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn);
- final String pcjId = ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql);
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
// Grant the root user the "u" authorization.
super.getAccumuloConnector().securityOperations().changeUserAuthorizations(getUsername(), new Authorizations("u"));
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
index 3b1b160..465e089 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java
@@ -342,7 +342,7 @@ public class KafkaExportITBase extends AccumuloExportITBase {
final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER,
ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn);
- final String pcjId = ryaClient.getCreatePCJ().get().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
+ final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA));
loadData(statements);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
index 894421a..f540a2e 100644
--- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
+++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java
@@ -342,7 +342,7 @@ public class GeoFunctionsIT extends RyaExportITBase {
accInstance.getInstanceName(),
accInstance.getZooKeepers()), accumuloConn);
- ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql);
+ ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
// Write the data to Rya.
final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
index ef33df1..17a2a23 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java
@@ -166,11 +166,34 @@ public class RyaAdminCommands implements CommandMarker {
*/
@CliAvailabilityIndicator({
CREATE_PCJ_CMD,
- DELETE_PCJ_CMD,
+ DELETE_PCJ_CMD})
+ public boolean arePCJCommandsAvailable() {
+ // The PCJ commands are only available if the Shell is connected to an instance of Rya
+ // that is new enough to use the RyaDetailsRepository and is configured to maintain PCJs.
+ final ShellState shellState = state.getShellState();
+ if(shellState.getConnectionState() == ConnectionState.CONNECTED_TO_INSTANCE) {
+ final GetInstanceDetails getInstanceDetails = shellState.getConnectedCommands().get().getGetInstanceDetails();
+ final String ryaInstanceName = state.getShellState().getRyaInstanceName().get();
+ try {
+ final Optional<RyaDetails> instanceDetails = getInstanceDetails.getDetails( ryaInstanceName );
+ if(instanceDetails.isPresent()) {
+ return instanceDetails.get().getPCJIndexDetails().isEnabled();
+ }
+ } catch (final RyaClientException e) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Enables commands that are available when the Shell is connected to a Rya Instance that supports PCJ Indexing.
+ */
+ @CliAvailabilityIndicator({
CREATE_PERIODIC_PCJ_CMD,
DELETE_PERIODIC_PCJ_CMD,
LIST_INCREMENTAL_QUERIES})
- public boolean arePCJCommandsAvailable() {
+ public boolean arePeriodicPCJCommandsAvailable() {
// The PCJ commands are only available if the Shell is connected to an instance of Rya
// that is new enough to use the RyaDetailsRepository and is configured to maintain PCJs.
final ShellState shellState = state.getShellState();
@@ -267,8 +290,8 @@ public class RyaAdminCommands implements CommandMarker {
final boolean enableFreeTextIndex,
// TODO RYA-215
-// @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
-// final boolean enableGeospatialIndex,
+ // @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
+ // final boolean enableGeospatialIndex,
@CliOption(key = {"enableTemporalIndex"}, mandatory = false, help = "Use Temporal Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
final boolean enableTemporalIndex,
@@ -289,7 +312,7 @@ public class RyaAdminCommands implements CommandMarker {
.setEnableEntityCentricIndex(enableEntityCentricIndex)
.setEnableFreeTextIndex(enableFreeTextIndex)
// TODO RYA-215
-// .setEnableGeoIndex(enableGeospatialIndex)
+ // .setEnableGeoIndex(enableGeospatialIndex)
.setEnableTemporalIndex(enableTemporalIndex)
.setEnablePcjIndex(enablePcjIndex)
.setFluoPcjAppName(fluoPcjAppName)
@@ -320,11 +343,14 @@ public class RyaAdminCommands implements CommandMarker {
final boolean enableFreeTextIndex,
// TODO RYA-215
-// @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
-// final boolean enableGeospatialIndex,
+ // @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
+ // final boolean enableGeospatialIndex,
@CliOption(key = {"enableTemporalIndex"}, mandatory = false, help = "Use Temporal Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
- final boolean enableTemporalIndex) {
+ final boolean enableTemporalIndex,
+
+ @CliOption(key = {"enablePcjIndex"}, mandatory = false, help = "Use Precomputed Join (PCJ) Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true")
+ final boolean enablePcjIndex) {
// Fetch the commands that are connected to the store.
final RyaClient commands = state.getShellState().getConnectedCommands().get();
@@ -333,8 +359,9 @@ public class RyaAdminCommands implements CommandMarker {
final InstallConfiguration installConfig = InstallConfiguration.builder()
.setEnableFreeTextIndex(enableFreeTextIndex)
// TODO RYA-215
-// .setEnableGeoIndex(enableGeospatialIndex)
+ // .setEnableGeoIndex(enableGeospatialIndex)
.setEnableTemporalIndex(enableTemporalIndex)
+ .setEnablePcjIndex(enablePcjIndex)
.build();
// Verify the configuration is what the user actually wants to do.
@@ -401,7 +428,7 @@ public class RyaAdminCommands implements CommandMarker {
final Optional<String> sparql = sparqlPrompt.getSparql();
if (sparql.isPresent()) {
// Execute the command.
- final String pcjId = commands.getCreatePCJ().get().createPCJ(ryaInstance, sparql.get(), strategies);
+ final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get(), strategies);
// Return a message that indicates the ID of the newly created ID.
return String.format("The PCJ has been created. Its ID is '%s'.", pcjId);
} else {
@@ -425,7 +452,7 @@ public class RyaAdminCommands implements CommandMarker {
try {
// Execute the command.
- commands.getDeletePCJ().get().deletePCJ(ryaInstance, pcjId);
+ commands.getDeletePCJ().deletePCJ(ryaInstance, pcjId);
return "The PCJ has been deleted.";
} catch (final InstanceDoesNotExistException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java b/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java
index e92eadd..31480db 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java
@@ -108,14 +108,14 @@ public interface InstallPrompt {
checkState(storageType.isPresent(), "The shell must be connected to a storage to use the install prompt.");
switch(sharedShellState.getShellState().getStorageType().get()) {
- case ACCUMULO:
- return promptAccumuloVerified(instanceName, installConfig);
+ case ACCUMULO:
+ return promptAccumuloVerified(instanceName, installConfig);
- case MONGO:
- return promptMongoVerified(instanceName, installConfig);
+ case MONGO:
+ return promptMongoVerified(instanceName, installConfig);
- default:
- throw new IllegalStateException("Unsupported storage type: " + storageType.get());
+ default:
+ throw new IllegalStateException("Unsupported storage type: " + storageType.get());
}
}
@@ -143,9 +143,9 @@ public interface InstallPrompt {
final boolean enableFreeTextIndexing = promptBoolean(prompt, Optional.of(true));
builder.setEnableFreeTextIndex( enableFreeTextIndexing );
-// RYA-215 prompt = makeFieldPrompt("Use Geospatial Indexing", true);
-// final boolean enableGeoIndexing = promptBoolean(prompt, Optional.of(true));
-// builder.setEnableGeoIndex( enableGeoIndexing );
+ // RYA-215 prompt = makeFieldPrompt("Use Geospatial Indexing", true);
+ // final boolean enableGeoIndexing = promptBoolean(prompt, Optional.of(true));
+ // builder.setEnableGeoIndex( enableGeoIndexing );
prompt = makeFieldPrompt("Use Temporal Indexing", true);
final boolean useTemporalIndexing = promptBoolean(prompt, Optional.of(true));
@@ -188,7 +188,7 @@ public interface InstallPrompt {
reader.println(" Use Shard Balancing: " + installConfig.isTableHashPrefixEnabled());
reader.println(" Use Entity Centric Indexing: " + installConfig.isEntityCentrixIndexEnabled());
reader.println(" Use Free Text Indexing: " + installConfig.isFreeTextIndexEnabled());
-// RYA-215 reader.println(" Use Geospatial Indexing: " + installConfig.isGeoIndexEnabled());
+ // RYA-215 reader.println(" Use Geospatial Indexing: " + installConfig.isGeoIndexEnabled());
reader.println(" Use Temporal Indexing: " + installConfig.isTemporalIndexEnabled());
reader.println(" Use Precomputed Join Indexing: " + installConfig.isPcjIndexEnabled());
if(installConfig.isPcjIndexEnabled()) {
@@ -224,6 +224,10 @@ public interface InstallPrompt {
final boolean useTemporalIndexing = promptBoolean(prompt, Optional.of(true));
builder.setEnableTemporalIndex( useTemporalIndexing );
+ prompt = makeFieldPrompt("Use PCJ Indexing", true);
+ final boolean usePcjIndexing = promptBoolean(prompt, Optional.of(true));
+ builder.setEnablePcjIndex(usePcjIndexing);
+
return builder.build();
}
@@ -246,6 +250,7 @@ public interface InstallPrompt {
reader.println(" Instance Name: " + instanceName);
reader.println(" Use Free Text Indexing: " + installConfig.isFreeTextIndexEnabled());
reader.println(" Use Temporal Indexing: " + installConfig.isTemporalIndexEnabled());
+ reader.println(" Use PCJ Indexing: " + installConfig.isPcjIndexEnabled());
reader.println("");
return promptBoolean("Continue with the install? (y/n) ", Optional.absent());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java
index 3eacb54..2af1507 100644
--- a/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java
+++ b/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java
@@ -45,7 +45,7 @@ public class RyaDetailsFormatter {
* @param details - The object to format. (not null)
* @return A pretty render of the object.
*/
- public String format(StorageType storageType, final RyaDetails details) {
+ public String format(final StorageType storageType, final RyaDetails details) {
requireNonNull(details);
final StringBuilder report = new StringBuilder();
@@ -72,41 +72,43 @@ public class RyaDetailsFormatter {
report.append(" Temporal Index:\n");
report.append(" Enabled: ").append( details.getTemporalIndexDetails().isEnabled() ).append("\n");
- if(storageType == StorageType.ACCUMULO) {
- report.append(" PCJ Index:\n");
- final PCJIndexDetails pcjDetails = details.getPCJIndexDetails();
- report.append(" Enabled: ").append( pcjDetails.isEnabled() ).append("\n");
- if(pcjDetails.isEnabled()) {
- if(pcjDetails.getFluoDetails().isPresent()) {
- final String fluoAppName = pcjDetails.getFluoDetails().get().getUpdateAppName();
- report.append(" Fluo App Name: ").append(fluoAppName).append("\n");
- }
+ report.append(" PCJ Index:\n");
+ final PCJIndexDetails pcjDetails = details.getPCJIndexDetails();
+ report.append(" Enabled: ").append( pcjDetails.isEnabled() ).append("\n");
+ if(pcjDetails.isEnabled()) {
+ if(pcjDetails.getFluoDetails().isPresent()) {
+ final String fluoAppName = pcjDetails.getFluoDetails().get().getUpdateAppName();
+ report.append(" Fluo App Name: ").append(fluoAppName).append("\n");
+ }
- final ImmutableMap<String, PCJDetails> pcjs = pcjDetails.getPCJDetails();
- report.append(" PCJs:\n");
- if(pcjs.isEmpty()) {
- report.append(" No PCJs have been added yet.\n");
- } else {
- for(final PCJDetails pcj : pcjs.values()) {
- report.append(" ID: ").append(pcj.getId()).append("\n");
+ final ImmutableMap<String, PCJDetails> pcjs = pcjDetails.getPCJDetails();
+ report.append(" PCJs:\n");
+ if(pcjs.isEmpty()) {
+ report.append(" No PCJs have been added yet.\n");
+ } else {
+ for(final PCJDetails pcj : pcjs.values()) {
+ report.append(" ID: ").append(pcj.getId()).append("\n");
- final String updateStrategy = format( pcj.getUpdateStrategy(), "None" );
- report.append(" Update Strategy: ").append(updateStrategy).append("\n");
+ final String updateStrategy = format( pcj.getUpdateStrategy(), "None" );
+ report.append(" Update Strategy: ").append(updateStrategy).append("\n");
- final String lastUpdateTime = format( pcj.getLastUpdateTime(), "unavailable");
- report.append(" Last Update Time: ").append(lastUpdateTime).append("\n");
- }
+ final String lastUpdateTime = format( pcj.getLastUpdateTime(), "unavailable");
+ report.append(" Last Update Time: ").append(lastUpdateTime).append("\n");
}
}
- report.append("Statistics:\n");
- report.append(" Prospector:\n");
- final String prospectorLastUpdateTime = format(details.getProspectorDetails().getLastUpdated(), "unavailable");
- report.append(" Last Update Time: ").append( prospectorLastUpdateTime).append("\n");
-
- report.append(" Join Selectivity:\n");
- final String jsLastUpdateTime = format(details.getJoinSelectivityDetails().getLastUpdated(), "unavailable");
- report.append(" Last Updated Time: ").append( jsLastUpdateTime ).append("\n");
+ if (storageType == StorageType.ACCUMULO) {
+ report.append("Statistics:\n");
+ report.append(" Prospector:\n");
+ final String prospectorLastUpdateTime = format(details.getProspectorDetails().getLastUpdated(),
+ "unavailable");
+ report.append(" Last Update Time: ").append(prospectorLastUpdateTime).append("\n");
+
+ report.append(" Join Selectivity:\n");
+ final String jsLastUpdateTime = format(details.getJoinSelectivityDetails().getLastUpdated(),
+ "unavailable");
+ report.append(" Last Updated Time: ").append(jsLastUpdateTime).append("\n");
+ }
}
return report.toString();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java
----------------------------------------------------------------------
diff --git a/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java b/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java
index d214afa..79fe95d 100644
--- a/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java
+++ b/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java
@@ -56,19 +56,6 @@ public class MongoRyaShellIT extends RyaShellMongoITBase {
// Ensure the connection was successful.
assertTrue(connectResult.isSuccess());
}
-
- @Test
- public void connectMongo_noConnection() throws IOException {
- final JLineShellComponent shell = getTestShell();
- // Attempt to connect to a mongo instance. The bad hostname should make this fail.
- final String cmd =
- RyaConnectionCommands.CONNECT_MONGO_CMD + " " +
- "--hostname badhostname " +
- "--port " + super.conf.getMongoPort();
-
- final CommandResult rez = shell.executeCommand(cmd);
- assertEquals(RuntimeException.class, rez.getException().getClass());
- }
@Test
public void printConnectionDetails_notConnected() {