You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/02 21:02:02 UTC
[9/9] incubator-rya git commit: RYA-280-Periodic Query Service.
Closes #177.
RYA-280-Periodic Query Service. Closes #177.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/2ca85427
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/2ca85427
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/2ca85427
Branch: refs/heads/master
Commit: 2ca854271c2eb928e9ccd10d40599c8c535952fa
Parents: ab8035a
Author: Caleb Meier <ca...@parsons.com>
Authored: Fri Apr 14 19:20:25 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Wed Aug 2 14:00:45 2017 -0700
----------------------------------------------------------------------
common/rya.api/pom.xml | 6 +-
.../client/accumulo/AccumuloCreatePCJIT.java | 1 +
extras/pom.xml | 1 +
.../pcj/storage/PeriodicQueryResultStorage.java | 115 +++++
.../storage/PeriodicQueryStorageException.java | 49 ++
.../storage/PeriodicQueryStorageMetadata.java | 99 ++++
.../AccumuloPeriodicQueryResultStorage.java | 270 ++++++++++
.../AccumuloValueBindingSetIterator.java | 73 +++
.../pcj/storage/accumulo/PcjTables.java | 5 +-
.../accumulo/PeriodicQueryTableNameFactory.java | 55 ++
.../accumulo/VisibilityBindingSetSerDe.java | 76 +++
.../accumulo/VisibilityBindingSetSerDeTest.java | 52 ++
.../accumulo/accumulo/AccumuloPcjStorageIT.java | 276 ----------
.../integration/AccumuloPcjStorageIT.java | 276 ++++++++++
.../AccumuloPeriodicQueryResultStorageIT.java | 271 ++++++++++
extras/rya.pcj.fluo/README.md | 25 +-
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 184 +++++--
.../rya/indexing/pcj/fluo/api/DeletePcj.java | 53 +-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 101 ++--
.../rya/indexing/pcj/fluo/app/FilterFinder.java | 84 ---
.../pcj/fluo/app/FilterResultUpdater.java | 14 +-
.../fluo/app/IncrementalUpdateConstants.java | 5 +
.../pcj/fluo/app/JoinResultUpdater.java | 1 +
.../rya/indexing/pcj/fluo/app/NodeType.java | 4 +
.../pcj/fluo/app/PeriodicQueryUpdater.java | 138 +++++
.../pcj/fluo/app/QueryResultUpdater.java | 3 +-
.../pcj/fluo/app/VisibilityBindingSetSerDe.java | 77 ---
.../batch/AbstractBatchBindingSetUpdater.java | 55 ++
.../app/batch/AbstractSpanBatchInformation.java | 101 ++++
.../fluo/app/batch/BasicBatchInformation.java | 81 +++
.../fluo/app/batch/BatchBindingSetUpdater.java | 43 ++
.../pcj/fluo/app/batch/BatchInformation.java | 57 +++
.../pcj/fluo/app/batch/BatchInformationDAO.java | 59 +++
.../pcj/fluo/app/batch/BatchObserver.java | 63 +++
.../pcj/fluo/app/batch/BatchRowKeyUtil.java | 68 +++
.../app/batch/JoinBatchBindingSetUpdater.java | 184 +++++++
.../fluo/app/batch/JoinBatchInformation.java | 255 ++++++++++
.../app/batch/SpanBatchBindingSetUpdater.java | 128 +++++
.../app/batch/SpanBatchDeleteInformation.java | 95 ++++
.../serializer/BatchInformationSerializer.java | 58 +++
.../serializer/BatchInformationTypeAdapter.java | 73 +++
.../BatchInformationTypeAdapterFactory.java | 65 +++
.../JoinBatchInformationTypeAdapter.java | 94 ++++
.../SpanBatchInformationTypeAdapter.java | 69 +++
.../export/kafka/KafkaBindingSetExporter.java | 8 +-
.../app/export/rya/RyaBindingSetExporter.java | 15 +-
.../rya/RyaBindingSetExporterFactory.java | 7 +-
.../fluo/app/observers/BindingSetUpdater.java | 16 +-
.../pcj/fluo/app/observers/FilterObserver.java | 2 +-
.../pcj/fluo/app/observers/JoinObserver.java | 2 +-
.../app/observers/PeriodicQueryObserver.java | 72 +++
.../fluo/app/observers/QueryResultObserver.java | 2 +-
.../app/observers/StatementPatternObserver.java | 2 +-
.../pcj/fluo/app/observers/TripleObserver.java | 2 +-
.../pcj/fluo/app/query/AggregationMetadata.java | 18 +
.../pcj/fluo/app/query/FilterMetadata.java | 67 +--
.../indexing/pcj/fluo/app/query/FluoQuery.java | 81 ++-
.../pcj/fluo/app/query/FluoQueryColumns.java | 66 ++-
.../fluo/app/query/FluoQueryMetadataDAO.java | 119 ++++-
.../fluo/app/query/PeriodicQueryMetadata.java | 287 +++++++++++
.../pcj/fluo/app/query/PeriodicQueryNode.java | 154 ++++++
.../pcj/fluo/app/query/QueryMetadata.java | 15 +
.../fluo/app/query/SparqlFluoQueryBuilder.java | 89 +++-
.../pcj/fluo/app/util/FilterSerializer.java | 127 +++++
.../pcj/fluo/app/util/FluoClientFactory.java | 56 ++
.../pcj/fluo/app/util/PeriodicQueryUtil.java | 381 ++++++++++++++
.../indexing/pcj/fluo/app/FilterFinderTest.java | 84 ---
.../fluo/app/VisibilityBindingSetSerDeTest.java | 51 --
.../BatchInformationSerializerTest.java | 73 +++
.../fluo/app/query/PeriodicQueryUtilTest.java | 229 +++++++++
.../fluo/client/util/QueryReportRenderer.java | 3 +-
.../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 5 +
.../rya/indexing/pcj/fluo/FluoITBase.java | 282 ----------
.../indexing/pcj/fluo/KafkaExportITBase.java | 370 --------------
.../rya/indexing/pcj/fluo/RyaExportITBase.java | 85 ----
.../pcj/fluo/api/CountStatementsIT.java | 2 +-
.../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 2 +-
.../indexing/pcj/fluo/api/GetQueryReportIT.java | 2 +-
.../indexing/pcj/fluo/api/ListQueryIdsIT.java | 2 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 46 +-
.../pcj/fluo/integration/BatchDeleteIT.java | 316 ++++++++++++
.../pcj/fluo/integration/CreateDeleteIT.java | 2 +-
.../indexing/pcj/fluo/integration/InputIT.java | 2 +-
.../pcj/fluo/integration/KafkaExportIT.java | 2 +-
.../integration/KafkaRyaSubGraphExportIT.java | 2 +-
.../indexing/pcj/fluo/integration/QueryIT.java | 359 ++++++++++++-
.../pcj/fluo/integration/RyaExportIT.java | 2 +-
.../RyaInputIncrementalUpdateIT.java | 2 +-
.../pcj/fluo/integration/StreamingTestIT.java | 2 +-
.../HistoricStreamingVisibilityIT.java | 2 +-
.../pcj/fluo/visibility/PcjVisibilityIT.java | 3 +-
extras/rya.pcj.fluo/pcj.fluo.test.base/pom.xml | 108 ++++
.../org/apache/rya/kafka/base/KafkaITBase.java | 78 +++
.../rya/pcj/fluo/test/base/FluoITBase.java | 300 +++++++++++
.../pcj/fluo/test/base/KafkaExportITBase.java | 370 ++++++++++++++
.../rya/pcj/fluo/test/base/RyaExportITBase.java | 80 +++
extras/rya.pcj.fluo/pom.xml | 1 +
.../periodic.service.integration.tests/pom.xml | 77 +++
.../PeriodicNotificationApplicationIT.java | 509 +++++++++++++++++++
.../PeriodicNotificationProviderIT.java | 68 +++
.../PeriodicNotificationExporterIT.java | 130 +++++
.../PeriodicNotificationProcessorIT.java | 121 +++++
.../pruner/PeriodicNotificationBinPrunerIT.java | 286 +++++++++++
.../PeriodicCommandNotificationConsumerIT.java | 120 +++++
.../src/test/resources/notification.properties | 35 ++
.../periodic.service.notification/pom.xml | 107 ++++
.../periodic/notification/api/BinPruner.java | 40 ++
.../notification/api/BindingSetExporter.java | 38 ++
.../notification/api/CreatePeriodicQuery.java | 113 ++++
.../periodic/notification/api/LifeCycle.java | 45 ++
.../rya/periodic/notification/api/NodeBin.java | 77 +++
.../periodic/notification/api/Notification.java | 34 ++
.../api/NotificationCoordinatorExecutor.java | 41 ++
.../notification/api/NotificationProcessor.java | 41 ++
.../api/PeriodicNotificationClient.java | 64 +++
.../PeriodicApplicationException.java | 47 ++
.../PeriodicNotificationApplication.java | 207 ++++++++
...dicNotificationApplicationConfiguration.java | 254 +++++++++
.../PeriodicNotificationApplicationFactory.java | 140 +++++
...PeriodicNotificationCoordinatorExecutor.java | 159 ++++++
.../notification/exporter/BindingSetRecord.java | 80 +++
.../exporter/KafkaExporterExecutor.java | 109 ++++
.../KafkaPeriodicBindingSetExporter.java | 98 ++++
.../notification/BasicNotification.java | 76 +++
.../notification/CommandNotification.java | 99 ++++
.../notification/PeriodicNotification.java | 178 +++++++
.../notification/TimestampedNotification.java | 69 +++
.../NotificationProcessorExecutor.java | 114 +++++
.../TimestampedNotificationProcessor.java | 203 ++++++++
.../notification/pruner/AccumuloBinPruner.java | 66 +++
.../notification/pruner/FluoBinPruner.java | 76 +++
.../pruner/PeriodicQueryPruner.java | 108 ++++
.../pruner/PeriodicQueryPrunerExecutor.java | 104 ++++
.../recovery/PeriodicNotificationProvider.java | 138 +++++
.../kafka/KafkaNotificationProvider.java | 123 +++++
.../KafkaNotificationRegistrationClient.java | 80 +++
.../kafka/PeriodicNotificationConsumer.java | 88 ++++
.../BasicNotificationTypeAdapter.java | 55 ++
.../serialization/BindingSetSerDe.java | 105 ++++
.../CommandNotificationSerializer.java | 76 +++
.../CommandNotificationTypeAdapter.java | 89 ++++
.../PeriodicNotificationTypeAdapter.java | 73 +++
.../CommandNotificationSerializerTest.java | 60 +++
extras/rya.periodic.service/pom.xml | 39 ++
144 files changed, 11783 insertions(+), 1593 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/common/rya.api/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api/pom.xml b/common/rya.api/pom.xml
index 94f191d..3c80a13 100644
--- a/common/rya.api/pom.xml
+++ b/common/rya.api/pom.xml
@@ -71,9 +71,9 @@ under the License.
<artifactId>jcip-annotations</artifactId>
</dependency>
<dependency>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
- <version>2.24.0</version>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.24.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
index cb4b29a..9bbf01f 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
@@ -80,6 +80,7 @@ public class AccumuloCreatePCJIT extends FluoITBase {
assertEquals(sparql, pcjMetadata.getSparql());
assertEquals(0L, pcjMetadata.getCardinality());
+
// Verify a Query ID was added for the query within the Fluo app.
final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient);
assertEquals(1, fluoQueryIds.size());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 6acb51f..a2c8d58 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -33,6 +33,7 @@ under the License.
<modules>
<module>rya.prospector</module>
<module>rya.manual</module>
+ <module>rya.periodic.service</module>
<module>rya.console</module>
<module>indexing</module>
<module>rya.indexing.pcj</module>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
new file mode 100644
index 0000000..697b350
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryResultStorage.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Interface for storing and retrieving Periodic Query Results.
+ *
+ */
+public interface PeriodicQueryResultStorage {
+
+ /**
+ * Binding name for the periodic bin id
+ */
+ public static String PeriodicBinId = "periodicBinId";
+
+ /**
+ * Creates a PeriodicQuery result storage layer for the given SPARQL query
+ * @param sparql - SPARQL query
+ * @return - id of the storage layer for the given SPARQL query
+ * @throws PeriodicQueryStorageException
+ */
+ public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException;
+
+ /**
+ * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
+ * @param queryId - id of the storage layer for the given SPARQL query
+ * @param sparql - SPARQL query whose periodic results will be stored
+ * @return - id of the storage layer
+ * @throws PeriodicQueryStorageException
+ */
+ public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException;
+
+ /**
+ * Creates a PeriodicQuery result storage layer for the given SPARQL query with the given id
+ * whose results are written in the order indicated by the specified VariableOrder.
+ * @param queryId - id of the storage layer for the given SPARQL query
+ * @param sparql - SPARQL query whose periodic results will be stored
+ * @param varOrder - VariableOrder indicating the order that results will be written in
+ * @return - id of the storage layer
+ * @throws PeriodicQueryStorageException
+ */
+ public void createPeriodicQuery(String queryId, String sparql, VariableOrder varOrder) throws PeriodicQueryStorageException;
+
+ /**
+ * Retrieve the {@link PeriodicQueryStorageMetdata} for the give query id
+ * @param queryID - id of the query whose metadata will be returned
+ * @return PeriodicQueryStorageMetadata
+ * @throws PeriodicQueryStorageException
+ */
+ public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryID) throws PeriodicQueryStorageException;;
+
+ /**
+ * Add periodic query results to the storage layer indicated by the given query id
+ * @param queryId - id indicating the storage layer that results will be added to
+ * @param results - query results to be added to storage
+ * @throws PeriodicQueryStorageException
+ */
+ public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException;;
+
+ /**
+ * Deletes periodic query results from the storage layer
+ * @param queryId - id indicating the storage layer that results will be deleted from
+ * @param binID - bin id indicating the periodic id of results to be deleted
+ * @throws PeriodicQueryStorageException
+ */
+ public void deletePeriodicQueryResults(String queryId, long binID) throws PeriodicQueryStorageException;;
+
+ /**
+ * Deletes all results for the storage layer indicated by the given query id
+ * @param queryID - id indicating the storage layer whose results will be deleted
+ * @throws PeriodicQueryStorageException
+ */
+ public void deletePeriodicQuery(String queryID) throws PeriodicQueryStorageException;;
+
+ /**
+ * List results in the given storage layer indicated by the query id
+ * @param queryId - id indicating the storage layer whose results will be listed
+ * @param binID - Optional id to indicate that only results with specific periodic id be listed
+ * @return
+ * @throws PeriodicQueryStorageException
+ */
+ public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binID) throws PeriodicQueryStorageException;;
+
+ /**
+ * List all storage tables containing periodic results.
+ * @return List of Strings with names of all tables containing periodic results
+ */
+ public List<String> listPeriodicTables();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
new file mode 100644
index 0000000..f9e6969
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+/**
+ * This Exception is thrown by any implementation of {@link PeriodicQueryResultStorage}
+ * when any of its methods fail.
+ *
+ */
+public class PeriodicQueryStorageException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link PeriodicQueryStorageException}.
+ *
+ * @param message - Describes why the exception is being thrown.
+ */
+ public PeriodicQueryStorageException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an instance of {@link PeriodicQueryStorageException}.
+ *
+ * @param message - Describes why the exception is being thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public PeriodicQueryStorageException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
new file mode 100644
index 0000000..9ce3522
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage;
+
+import java.util.Objects;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Metadata for a given PeriodicQueryStorage table.
+ */
+public class PeriodicQueryStorageMetadata {
+
+ private String sparql;
+ private VariableOrder varOrder;
+
+ /**
+ * Create a PeriodicQueryStorageMetadata object
+ * @param sparql - SPARQL query whose results are stored in table
+ * @param varOrder - order that BindingSet values are written in in table
+ */
+ public PeriodicQueryStorageMetadata(String sparql, VariableOrder varOrder) {
+ this.sparql = Preconditions.checkNotNull(sparql);
+ this.varOrder = Preconditions.checkNotNull(varOrder);
+ }
+
+ /**
+ * Copy constructor.
+ * @param metadata - PeriodicQueryStorageMetadata object whose data is copied
+ */
+ public PeriodicQueryStorageMetadata(PcjMetadata metadata) {
+ this(metadata.getSparql(), metadata.getVarOrders().iterator().next());
+ }
+
+
+ /**
+ * @return SPARQL query whose results are stored in the table
+ */
+ public String getSparql() {
+ return sparql;
+ }
+
+ /**
+ * @return VariableOrder indicating the order that BindingSet Values are written in in table
+ */
+ public VariableOrder getVariableOrder() {
+ return varOrder;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sparql, varOrder);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (o == this) {
+ return true;
+ }
+
+ if (o instanceof PeriodicQueryStorageMetadata) {
+ PeriodicQueryStorageMetadata metadata = (PeriodicQueryStorageMetadata) o;
+ return new EqualsBuilder().append(sparql, metadata.sparql).append(varOrder, metadata.varOrder).isEquals();
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("PeriodicQueryStorageMetadata {\n")
+ .append(" SPARQL: " + sparql + "\n")
+ .append(" Variable Order: " + varOrder + "\n")
+ .append("}")
+ .toString();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
new file mode 100644
index 0000000..d7a50a7
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchDeleter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.apache.rya.indexing.pcj.storage.PCJIdFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.AggregateOperatorBase;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for
+ * creating, deleting, and interacting with tables where PeriodicQuery results are stored.
+ */
+public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage {
+
+ private String ryaInstance;
+ private Connector accumuloConn;
+ private Authorizations auths;
+ private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
+ private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+ private static final PcjTables pcjTables = new PcjTables();
+ private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory();
+
+ /**
+ * Creates a AccumuloPeriodicQueryResultStorage Object.
+ * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance
+ * @param ryaInstance - Rya Instance name for connecting to Rya
+ */
+ public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) {
+ this.accumuloConn = Preconditions.checkNotNull(accumuloConn);
+ this.ryaInstance = Preconditions.checkNotNull(ryaInstance);
+ String user = accumuloConn.whoami();
+ try {
+ this.auths = accumuloConn.securityOperations().getUserAuthorizations(user);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException("Unable access user: " + user + "authorizations.");
+ }
+ }
+
+ @Override
+ public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException {
+ Preconditions.checkNotNull(sparql);
+ String queryId = pcjIdFactory.nextId();
+ return createPeriodicQuery(queryId, sparql);
+ }
+
+ @Override
+ public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException {
+ Set<String> bindingNames;
+ try {
+ bindingNames = new AggregateVariableRemover().getNonAggregationVariables(sparql);
+ } catch (MalformedQueryException e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ List<String> varOrderList = new ArrayList<>();
+ varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId);
+ varOrderList.addAll(bindingNames);
+ createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList));
+ return queryId;
+ }
+
+ @Override
+ public void createPeriodicQuery(String queryId, String sparql, VariableOrder order) throws PeriodicQueryStorageException {
+ Preconditions.checkNotNull(sparql);
+ Preconditions.checkNotNull(queryId);
+ Preconditions.checkNotNull(order);
+ Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)),
+ "periodicBinId binding name must occur first in VariableOrder.");
+ String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
+ Set<VariableOrder> varOrders = new HashSet<>();
+ varOrders.add(order);
+ try {
+ pcjTables.createPcjTable(accumuloConn, tableName, varOrders, sparql);
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ }
+
+ @Override
+ public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryId) throws PeriodicQueryStorageException {
+ try {
+ return new PeriodicQueryStorageMetadata(
+ pcjTables.getPcjMetadata(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId)));
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException {
+ results.forEach(x -> Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId),
+ "BindingSet must contain periodBinId binding."));
+ try {
+ pcjTables.addResults(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId), results);
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deletePeriodicQueryResults(String queryId, long binId) throws PeriodicQueryStorageException {
+ String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
+ try {
+ Text prefix = getRowPrefix(binId);
+ BatchDeleter deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new BatchWriterConfig());
+ deleter.setRanges(Collections.singleton(Range.prefix(prefix)));
+ deleter.delete();
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ }
+
+ public void deletePeriodicQueryResults(String queryId) throws PeriodicQueryStorageException {
+ try {
+ pcjTables.purgePcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ }
+
+ @Override
+ public void deletePeriodicQuery(String queryId) throws PeriodicQueryStorageException {
+ try {
+ pcjTables.dropPcjTable(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId));
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(e.getMessage());
+ }
+ }
+
+ @Override
+ public CloseableIterator<BindingSet> listResults(String queryId, Optional<Long> binId)
+ throws PeriodicQueryStorageException {
+ requireNonNull(queryId);
+
+ String tableName = tableNameFactory.makeTableName(ryaInstance, queryId);
+ // Fetch the Variable Orders for the binding sets and choose one of
+ // them. It
+ // doesn't matter which one we choose because they all result in the
+ // same output.
+ final PeriodicQueryStorageMetadata metadata = getPeriodicQueryMetadata(queryId);
+ final VariableOrder varOrder = metadata.getVariableOrder();
+
+ try {
+ // Fetch only the Binding Sets whose Variable Order matches the
+ // selected one.
+ final Scanner scanner = accumuloConn.createScanner(tableName, auths);
+ scanner.fetchColumnFamily(new Text(varOrder.toString()));
+ if (binId.isPresent()) {
+ scanner.setRange(Range.prefix(getRowPrefix(binId.get())));
+ }
+ return new AccumuloValueBindingSetIterator(scanner);
+
+ } catch (Exception e) {
+ throw new PeriodicQueryStorageException(String.format("PCJ Table does not exist for name '%s'.", tableName), e);
+ }
+ }
+
+ private Text getRowPrefix(long binId) throws BindingSetConversionException {
+ QueryBindingSet bs = new QueryBindingSet();
+ bs.addBinding(PeriodicQueryResultStorage.PeriodicBinId, new LiteralImpl(Long.toString(binId), XMLSchema.LONG));
+
+ return new Text(converter.convert(bs, new VariableOrder(PeriodicQueryResultStorage.PeriodicBinId)));
+ }
+
+ @Override
+ public List<String> listPeriodicTables() {
+
+ final List<String> periodicTables = new ArrayList<>();
+ final String periodicPrefix = ryaInstance + PeriodicQueryTableNameFactory.PeriodicTableSuffix;
+ boolean foundInstance = false;
+
+ for (final String tableName : accumuloConn.tableOperations().list()) {
+ if (tableName.startsWith(ryaInstance)) {
+ // This table is part of the target Rya instance.
+ foundInstance = true;
+
+ if (tableName.startsWith(periodicPrefix)) {
+ periodicTables.add(tableName);
+ }
+ } else if (foundInstance) {
+ // We have encountered the first table name that does not start
+ // with the rya instance name after those that do. Because the
+ // list is sorted, there can't be any more pcj tables for the
+ // target instance in the list.
+ break;
+ }
+ }
+ return periodicTables;
+ }
+
+ /**
+ * Class for removing any aggregate variables from the ProjectionElementList
+ * of the parsed SPARQL queries. This ensures that only non-aggregation
+ * values are contained in the Accumulo row. The non-aggregation variables
+ * are not updated while the aggregation variables are, so they are included in
+ * the serialized BindingSet in the Accumulo Value field, which is overwritten
+ * if an entry with the same Key and different Value (updated aggregation) is
+ * written to the table.
+ *
+ */
+ static class AggregateVariableRemover extends QueryModelVisitorBase<RuntimeException> {
+
+ private Set<String> bindingNames;
+
+ public Set<String> getNonAggregationVariables(String sparql) throws MalformedQueryException {
+ TupleExpr te = new SPARQLParser().parseQuery(sparql, null).getTupleExpr();
+ bindingNames = te.getBindingNames();
+ te.visit(this);
+ return bindingNames;
+ }
+
+ @Override
+ public void meet(ExtensionElem node) {
+ if(node.getExpr() instanceof AggregateOperatorBase) {
+ bindingNames.remove(node.getName());
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
new file mode 100644
index 0000000..946c712
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloValueBindingSetIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.openrdf.query.BindingSet;
+
+/**
+ * Implementation of CloseableIterator for retrieving results from a {@link PeriodicQueryResultStorage}
+ * table.
+ *
+ */
+public class AccumuloValueBindingSetIterator implements CloseableIterator<BindingSet>{
+
+ private final Scanner scanner;
+ private final Iterator<Entry<Key, Value>> iter;
+ private final VisibilityBindingSetSerDe bsSerDe = new VisibilityBindingSetSerDe();
+
+ public AccumuloValueBindingSetIterator(Scanner scanner) {
+ this.scanner = scanner;
+ iter = scanner.iterator();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public BindingSet next() {
+ try {
+ return bsSerDe.deserialize(Bytes.of(iter.next().getValue().get())).set;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ scanner.close();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
index 5d13597..d5451ae 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java
@@ -403,6 +403,7 @@ public class PcjTables {
final Set<Mutation> mutations = new HashSet<>();
final AccumuloPcjSerializer converter = new AccumuloPcjSerializer();
+ VisibilityBindingSetSerDe bsSerDe = new VisibilityBindingSetSerDe();
for(final VariableOrder varOrder : varOrders) {
try {
@@ -412,9 +413,9 @@ public class PcjTables {
// Row ID = binding set values, Column Family = variable order of the binding set.
final Mutation addResult = new Mutation(rowKey);
final String visibility = result.getVisibility();
- addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), "");
+ addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), new Value(bsSerDe.serialize(result).toArray()));
mutations.add(addResult);
- } catch(final BindingSetConversionException e) {
+ } catch(Exception e) {
throw new PCJStorageException("Could not serialize a result.", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
new file mode 100644
index 0000000..561cad2
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+
+/**
+ * Class for creating the names of {@link PeriodicQueryResultStorage} tables.
+ *
+ */
+public class PeriodicQueryTableNameFactory {
+
+ public static final String PeriodicTableSuffix = "PERIODIC_QUERY_";
+
+ /**
+ * Creates name of a table from the indicating rya instance and query id
+ * @param ryaInstance - name of rya instance table will belong to
+ * @param queryId - id of query whose results will be stored in this table
+ * @return - name of PeriodicQueryResultStorage table
+ */
+ public String makeTableName(final String ryaInstance, final String queryId) {
+ requireNonNull(ryaInstance);
+ requireNonNull(queryId);
+ return ryaInstance + PeriodicTableSuffix + queryId.toString().replaceAll("-", "");
+ }
+
+ /**
+ * Extract query id from PeriodicQueryResultStorage table name
+ * @param periodTableName - name of table
+ * @return - query id whose results are stored in table
+ */
+ public String getPeriodicQueryId(final String periodTableName) {
+ requireNonNull(periodTableName);
+ return periodTableName.split(PeriodicTableSuffix)[1];
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
new file mode 100644
index 0000000..ae43a9a
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
+import org.apache.fluo.api.data.Bytes;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Serializes and deserializes a {@link VisibilityBindingSet} to and from {@link Bytes} objects.
+ */
+@DefaultAnnotation(NonNull.class)
+public class VisibilityBindingSetSerDe {
+
+ /**
+ * Serializes a {@link VisibilityBindingSet} into a {@link Bytes} object.
+ *
+ * @param bindingSet - The binding set that will be serialized. (not null)
+ * @return The serialized object.
+ * @throws Exception A problem was encountered while serializing the object.
+ */
+ public Bytes serialize(final VisibilityBindingSet bindingSet) throws Exception {
+ requireNonNull(bindingSet);
+
+ final ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ try(final ObjectOutputStream oos = new ObjectOutputStream(boas)) {
+ oos.writeObject(bindingSet);
+ }
+
+ return Bytes.of(boas.toByteArray());
+ }
+
+ /**
+ * Deserializes a {@link VisibilityBindingSet} from a {@link Bytes} object.
+ *
+ * @param bytes - The bytes that will be deserialized. (not null)
+ * @return The deserialized object.
+ * @throws Exception A problem was encountered while deserializing the object.
+ */
+ public VisibilityBindingSet deserialize(final Bytes bytes) throws Exception {
+ requireNonNull(bytes);
+
+ try(final ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toArray()))) {
+ final Object o = ois.readObject();
+ if(o instanceof VisibilityBindingSet) {
+ return (VisibilityBindingSet) o;
+ } else {
+ throw new Exception("Deserialized Object is not a VisibilityBindingSet. Was: " + o.getClass());
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java
new file mode 100644
index 0000000..16f56c1
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDeTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.rya.indexing.pcj.storage.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.impl.MapBindingSet;
+
+/**
+ * Tests the methods of {@link VisibilityBindingSetSerDe}.
+ */
+public class VisibilityBindingSetSerDeTest {
+
+ @Test
+ public void rountTrip() throws Exception {
+ final ValueFactory vf = new ValueFactoryImpl();
+
+ final MapBindingSet bs = new MapBindingSet();
+ bs.addBinding("name", vf.createLiteral("Alice"));
+ bs.addBinding("age", vf.createLiteral(5));
+ final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u");
+
+ final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe();
+ final Bytes bytes = serde.serialize(original);
+ final VisibilityBindingSet result = serde.deserialize(bytes);
+
+ assertEquals(original, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
deleted file mode 100644
index 98ed4c7..0000000
--- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java
+++ /dev/null
@@ -1,276 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.storage.accumulo.accumulo;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.rya.accumulo.AccumuloRyaITBase;
-import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
-import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
-import org.apache.rya.api.instance.RyaDetailsRepository;
-import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
-import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.junit.Test;
-import org.openrdf.model.impl.URIImpl;
-import org.openrdf.query.BindingSet;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.impl.MapBindingSet;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Integration tests the methods of {@link AccumuloPcjStorage}.
- * </p>
- * These tests ensures that the PCJ tables are maintained and that these operations
- * also update the Rya instance's details.
- */
-public class AccumuloPcjStorageIT extends AccumuloRyaITBase {
-
- @Test
- public void createPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a PCJ.
- final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-
- // Ensure the Rya details have been updated to include the PCJ's ID.
- final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
-
- final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
- .getPCJIndexDetails()
- .getPCJDetails();
-
- final PCJDetails expectedDetails = PCJDetails.builder()
- .setId( pcjId )
- .build();
-
- assertEquals(expectedDetails, detailsMap.get(pcjId));
- }
- }
-
- @Test
- public void dropPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a PCJ.
- final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
-
- // Delete the PCJ that was just created.
- pcjStorage.dropPcj(pcjId);
-
- // Ensure the Rya details have been updated to no longer include the PCJ's ID.
- final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
-
- final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
- .getPCJIndexDetails()
- .getPCJDetails();
-
- assertFalse( detailsMap.containsKey(pcjId) );
- }
- }
-
- @Test
- public void listPcjs() throws AccumuloException, AccumuloSecurityException, PCJStorageException {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a few PCJs and hold onto their IDs.
- final List<String> expectedIds = new ArrayList<>();
-
- String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
- expectedIds.add( pcjId );
-
- pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
- expectedIds.add( pcjId );
-
- pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
- expectedIds.add( pcjId );
-
- // Fetch the PCJ names
- final List<String> pcjIds = pcjStorage.listPcjs();
-
- // Ensure the expected IDs match the fetched IDs.
- Collections.sort(expectedIds);
- Collections.sort(pcjIds);
- assertEquals(expectedIds, pcjIds);
- }
- }
-
- @Test
- public void getPcjMetadata() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a PCJ.
- final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Fetch the PCJ's metadata.
- final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
- // Ensure it has the expected values.
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
- final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
- assertEquals(expectedMetadata, metadata);
- }
- }
-
- @Test
- public void addResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a PCJ.
- final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Add some binding sets to it.
- final Set<VisibilityBindingSet> results = new HashSet<>();
-
- final MapBindingSet aliceBS = new MapBindingSet();
- aliceBS.addBinding("a", new URIImpl("http://Alice"));
- aliceBS.addBinding("b", new URIImpl("http://Person"));
- results.add( new VisibilityBindingSet(aliceBS, "") );
-
- final MapBindingSet charlieBS = new MapBindingSet();
- charlieBS.addBinding("a", new URIImpl("http://Charlie"));
- charlieBS.addBinding("b", new URIImpl("http://Comedian"));
- results.add( new VisibilityBindingSet(charlieBS, "") );
-
- pcjStorage.addResults(pcjId, results);
-
- // Make sure the PCJ metadata was updated.
- final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
- final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders);
- assertEquals(expectedMetadata, metadata);
- }
- }
-
- @Test
- public void listResults() throws Exception {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a PCJ.
- final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Add some binding sets to it.
- final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
-
- final MapBindingSet aliceBS = new MapBindingSet();
- aliceBS.addBinding("a", new URIImpl("http://Alice"));
- aliceBS.addBinding("b", new URIImpl("http://Person"));
- expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
-
- final MapBindingSet charlieBS = new MapBindingSet();
- charlieBS.addBinding("a", new URIImpl("http://Charlie"));
- charlieBS.addBinding("b", new URIImpl("http://Comedian"));
- expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
-
- pcjStorage.addResults(pcjId, expectedResults);
-
- // List the results that were stored.
- final Set<BindingSet> results = new HashSet<>();
- try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
- while(resultsIt.hasNext()) {
- results.add( resultsIt.next() );
- }
- }
-
- assertEquals(expectedResults, results);
- }
- }
-
- @Test
- public void purge() throws Exception {
- // Setup the PCJ storage that will be tested against.
- final Connector connector = super.getClusterInstance().getConnector();
- final String ryaInstanceName = super.getRyaInstanceName();
- try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
- // Create a PCJ.
- final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
- final String pcjId = pcjStorage.createPcj(sparql);
-
- // Add some binding sets to it.
- final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
-
- final MapBindingSet aliceBS = new MapBindingSet();
- aliceBS.addBinding("a", new URIImpl("http://Alice"));
- aliceBS.addBinding("b", new URIImpl("http://Person"));
- expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
-
- final MapBindingSet charlieBS = new MapBindingSet();
- charlieBS.addBinding("a", new URIImpl("http://Charlie"));
- charlieBS.addBinding("b", new URIImpl("http://Comedian"));
- expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
-
- pcjStorage.addResults(pcjId, expectedResults);
-
- // Purge the PCJ.
- pcjStorage.purge(pcjId);
-
- // List the results that were stored.
- final Set<BindingSet> results = new HashSet<>();
- try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
- while(resultsIt.hasNext()) {
- results.add( resultsIt.next() );
- }
- }
-
- assertTrue( results.isEmpty() );
-
- // Make sure the PCJ metadata was updated.
- final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
-
- final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
- final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
- assertEquals(expectedMetadata, metadata);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
new file mode 100644
index 0000000..2964d91
--- /dev/null
+++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/integration/AccumuloPcjStorageIT.java
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.storage.accumulo.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.rya.accumulo.AccumuloRyaITBase;
+import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
+import org.apache.rya.api.instance.RyaDetailsRepository;
+import org.apache.rya.api.instance.RyaDetailsRepository.NotInitializedException;
+import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Integration tests the methods of {@link AccumuloPcjStorage}.
+ * </p>
+ * These tests ensures that the PCJ tables are maintained and that these operations
+ * also update the Rya instance's details.
+ */
+public class AccumuloPcjStorageIT extends AccumuloRyaITBase {
+
+ @Test
+ public void createPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a PCJ.
+ final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+
+ // Ensure the Rya details have been updated to include the PCJ's ID.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+
+ final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
+ .getPCJIndexDetails()
+ .getPCJDetails();
+
+ final PCJDetails expectedDetails = PCJDetails.builder()
+ .setId( pcjId )
+ .build();
+
+ assertEquals(expectedDetails, detailsMap.get(pcjId));
+ }
+ }
+
+ @Test
+ public void dropPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a PCJ.
+ final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+
+ // Delete the PCJ that was just created.
+ pcjStorage.dropPcj(pcjId);
+
+ // Ensure the Rya details have been updated to no longer include the PCJ's ID.
+ final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName);
+
+ final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails()
+ .getPCJIndexDetails()
+ .getPCJDetails();
+
+ assertFalse( detailsMap.containsKey(pcjId) );
+ }
+ }
+
+ @Test
+ public void listPcjs() throws AccumuloException, AccumuloSecurityException, PCJStorageException {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a few PCJs and hold onto their IDs.
+ final List<String> expectedIds = new ArrayList<>();
+
+ String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+ expectedIds.add( pcjId );
+
+ pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+ expectedIds.add( pcjId );
+
+ pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } ");
+ expectedIds.add( pcjId );
+
+ // Fetch the PCJ names
+ final List<String> pcjIds = pcjStorage.listPcjs();
+
+ // Ensure the expected IDs match the fetched IDs.
+ Collections.sort(expectedIds);
+ Collections.sort(pcjIds);
+ assertEquals(expectedIds, pcjIds);
+ }
+ }
+
+ @Test
+ public void getPcjMetadata() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Fetch the PCJ's metadata.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+ // Ensure it has the expected values.
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+ assertEquals(expectedMetadata, metadata);
+ }
+ }
+
+ @Test
+ public void addResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Add some binding sets to it.
+ final Set<VisibilityBindingSet> results = new HashSet<>();
+
+ final MapBindingSet aliceBS = new MapBindingSet();
+ aliceBS.addBinding("a", new URIImpl("http://Alice"));
+ aliceBS.addBinding("b", new URIImpl("http://Person"));
+ results.add( new VisibilityBindingSet(aliceBS, "") );
+
+ final MapBindingSet charlieBS = new MapBindingSet();
+ charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+ charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+ results.add( new VisibilityBindingSet(charlieBS, "") );
+
+ pcjStorage.addResults(pcjId, results);
+
+ // Make sure the PCJ metadata was updated.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders);
+ assertEquals(expectedMetadata, metadata);
+ }
+ }
+
+ @Test
+ public void listResults() throws Exception {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Add some binding sets to it.
+ final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+ final MapBindingSet aliceBS = new MapBindingSet();
+ aliceBS.addBinding("a", new URIImpl("http://Alice"));
+ aliceBS.addBinding("b", new URIImpl("http://Person"));
+ expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+ final MapBindingSet charlieBS = new MapBindingSet();
+ charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+ charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+ expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+ pcjStorage.addResults(pcjId, expectedResults);
+
+ // List the results that were stored.
+ final Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+
+ assertEquals(expectedResults, results);
+ }
+ }
+
+ @Test
+ public void purge() throws Exception {
+ // Setup the PCJ storage that will be tested against.
+ final Connector connector = super.getClusterInstance().getConnector();
+ final String ryaInstanceName = super.getRyaInstanceName();
+ try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName)) {
+ // Create a PCJ.
+ final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }";
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Add some binding sets to it.
+ final Set<VisibilityBindingSet> expectedResults = new HashSet<>();
+
+ final MapBindingSet aliceBS = new MapBindingSet();
+ aliceBS.addBinding("a", new URIImpl("http://Alice"));
+ aliceBS.addBinding("b", new URIImpl("http://Person"));
+ expectedResults.add( new VisibilityBindingSet(aliceBS, "") );
+
+ final MapBindingSet charlieBS = new MapBindingSet();
+ charlieBS.addBinding("a", new URIImpl("http://Charlie"));
+ charlieBS.addBinding("b", new URIImpl("http://Comedian"));
+ expectedResults.add( new VisibilityBindingSet(charlieBS, "") );
+
+ pcjStorage.addResults(pcjId, expectedResults);
+
+ // Purge the PCJ.
+ pcjStorage.purge(pcjId);
+
+ // List the results that were stored.
+ final Set<BindingSet> results = new HashSet<>();
+ try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) {
+ while(resultsIt.hasNext()) {
+ results.add( resultsIt.next() );
+ }
+ }
+
+ assertTrue( results.isEmpty() );
+
+ // Make sure the PCJ metadata was updated.
+ final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId);
+
+ final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql);
+ final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders);
+ assertEquals(expectedMetadata, metadata);
+ }
+ }
+}
\ No newline at end of file