You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/18 18:51:30 UTC
[5/5] incubator-rya git commit: RYA-282-Nested-Query. Closes #192.
RYA-282-Nested-Query. Closes #192.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e387818b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e387818b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e387818b
Branch: refs/heads/master
Commit: e387818ba22b07a07432d62eea172da6c33d793f
Parents: 6ce0b00
Author: Caleb Meier <ca...@parsons.com>
Authored: Thu Jul 20 06:57:38 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 18 11:50:36 2017 -0700
----------------------------------------------------------------------
.../api/client/accumulo/AccumuloCreatePCJ.java | 3 +-
.../api/client/accumulo/AccumuloDeletePCJ.java | 4 +-
.../client/accumulo/AccumuloCreatePCJIT.java | 1 +
.../rya/api/client/accumulo/FluoITBase.java | 4 +
.../indexing/pcj/fluo/api/CreateFluoPcj.java | 385 ++++++++++++++
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 450 ----------------
.../indexing/pcj/fluo/api/DeleteFluoPcj.java | 312 +++++++++++
.../rya/indexing/pcj/fluo/api/DeletePcj.java | 305 -----------
.../pcj/fluo/app/ConstructProjection.java | 4 -
.../fluo/app/ConstructQueryResultUpdater.java | 46 +-
.../pcj/fluo/app/FilterResultUpdater.java | 23 +-
.../pcj/fluo/app/FluoStringConverter.java | 2 -
.../fluo/app/IncrementalUpdateConstants.java | 4 +
.../pcj/fluo/app/JoinResultUpdater.java | 31 +-
.../rya/indexing/pcj/fluo/app/NodeType.java | 57 +-
.../pcj/fluo/app/PeriodicQueryUpdater.java | 1 -
.../pcj/fluo/app/ProjectionResultUpdater.java | 89 ++++
.../pcj/fluo/app/QueryResultUpdater.java | 20 +-
.../export/kafka/KafkaRyaSubGraphExporter.java | 2 -
.../fluo/app/observers/BindingSetUpdater.java | 14 +-
.../fluo/app/observers/ProjectionObserver.java | 65 +++
.../fluo/app/observers/QueryResultObserver.java | 2 +-
.../pcj/fluo/app/query/AggregationMetadata.java | 8 +-
.../pcj/fluo/app/query/CommonNodeMetadata.java | 14 +
.../fluo/app/query/ConstructQueryMetadata.java | 94 ++--
.../pcj/fluo/app/query/FilterMetadata.java | 11 +-
.../indexing/pcj/fluo/app/query/FluoQuery.java | 234 ++++++---
.../pcj/fluo/app/query/FluoQueryColumns.java | 45 +-
.../fluo/app/query/FluoQueryMetadataDAO.java | 148 ++++--
.../pcj/fluo/app/query/JoinMetadata.java | 17 +-
.../fluo/app/query/PeriodicQueryMetadata.java | 9 +-
.../pcj/fluo/app/query/ProjectionMetadata.java | 236 +++++++++
.../fluo/app/query/QueryBuilderVisitorBase.java | 119 +++++
.../pcj/fluo/app/query/QueryMetadata.java | 98 +++-
.../app/query/QueryMetadataVisitorBase.java | 113 ++++
.../fluo/app/query/SparqlFluoQueryBuilder.java | 444 +++++++++++-----
.../app/query/StatementPatternMetadata.java | 7 +-
.../pcj/fluo/app/util/FluoQueryUtils.java | 62 +++
.../pcj/fluo/app/util/PeriodicQueryUtil.java | 76 +--
.../app/util/VariableOrderUpdateVisitor.java | 166 ++++++
.../fluo/app/query/PeriodicQueryUtilTest.java | 10 +-
.../fluo/app/query/QueryBuilderVisitorTest.java | 105 ++++
.../app/query/QueryMetadataVisitorTest.java | 109 ++++
.../fluo/client/command/NewQueryCommand.java | 4 +-
.../fluo/client/util/QueryReportRenderer.java | 41 +-
.../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 4 +-
.../pcj/fluo/ConstructGraphTestUtils.java | 15 +-
.../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 4 +-
.../indexing/pcj/fluo/api/GetQueryReportIT.java | 4 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 163 +++++-
.../pcj/fluo/integration/BatchDeleteIT.java | 8 +-
.../pcj/fluo/integration/CreateDeleteIT.java | 10 +-
.../indexing/pcj/fluo/integration/InputIT.java | 10 +-
.../pcj/fluo/integration/KafkaExportIT.java | 163 +++++-
.../integration/KafkaRyaSubGraphExportIT.java | 86 ++-
.../indexing/pcj/fluo/integration/QueryIT.java | 517 ++++++++++++-------
.../pcj/fluo/integration/RyaExportIT.java | 4 +-
.../RyaInputIncrementalUpdateIT.java | 8 +-
.../pcj/fluo/integration/StreamingTestIT.java | 4 +-
.../HistoricStreamingVisibilityIT.java | 4 +-
.../pcj/fluo/visibility/PcjVisibilityIT.java | 4 +-
.../pcj/fluo/test/base/KafkaExportITBase.java | 4 +-
.../rya/pcj/fluo/test/base/RyaExportITBase.java | 2 +
.../PeriodicNotificationProviderIT.java | 9 +-
.../notification/api/CreatePeriodicQuery.java | 10 +-
.../recovery/PeriodicNotificationProvider.java | 9 +-
66 files changed, 3569 insertions(+), 1467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 3fe1042..644189a 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -38,6 +38,7 @@ import org.apache.rya.api.instance.RyaDetailsUpdater;
import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
@@ -145,7 +146,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
cd.getZookeepers(),
fluoAppName);) {
// Initialize the PCJ within the Fluo application.
- final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj();
+ final CreateFluoPcj fluoCreatePcj = new CreateFluoPcj();
fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
index 96e6d58..eb2b2d7 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
@@ -31,7 +31,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
-import org.apache.rya.indexing.pcj.fluo.api.DeletePcj;
+import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -123,7 +123,7 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
cd.getZookeepers(),
fluoAppName)) {
// Delete the PCJ from the Fluo App.
- new DeletePcj(1000).deletePcj(fluoClient, pcjId);
+ new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
index 9bbf01f..3463a02 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java
@@ -34,6 +34,7 @@ import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy;
import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds;
+import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
index 2e16412..113b397 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
@@ -39,8 +39,10 @@ import org.apache.rya.accumulo.MiniAccumuloClusterInstance;
import org.apache.rya.accumulo.MiniAccumuloSingleton;
import org.apache.rya.accumulo.RyaTestInstanceRule;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
+import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
@@ -219,6 +221,8 @@ public abstract class FluoITBase {
observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+ observers.add(new ObserverSpecification(ProjectionObserver.class.getName()));
+ observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName()));
// Provide export parameters child test classes may provide to the
// export observer.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
new file mode 100644
index 0000000..e450960
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.persist.query.BatchRyaQuery;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PcjMetadata;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.model.Resource;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.StatementPattern;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
+ * <p>
+ * This is a two phase process.
+ * <ol>
+ * <li>Setup metadata about each node of the query using a single Fluo transaction. </li>
+ * <li>Scan Rya for binding sets that match each Statement Pattern from the query
+ * and use a separate Fluo transaction for each batch that is inserted. This
+ * ensure historic triples will be included in the query's results.</li>
+ * </ol>
+ * After the first step is finished, any new Triples that are added to the Fluo
+ * application will be matched against statement patterns, the final results
+ * will percolate to the top of the query, and those results will be exported to
+ * Rya's query system.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CreateFluoPcj {
+ private static final Logger log = Logger.getLogger(CreateFluoPcj.class);
+
+ /**
+ * The default Statement Pattern batch insert size is 1000.
+ */
+ private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
+
+ /**
+ * The maximum number of binding sets that will be inserted into each Statement
+ * Pattern's result set per Fluo transaction.
+ */
+ private final int spInsertBatchSize;
+
+ /**
+ * Constructs an instance of {@link CreateFluoPcj} that uses
+ * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
+ */
+ public CreateFluoPcj() {
+ this(DEFAULT_SP_INSERT_BATCH_SIZE);
+ }
+
+ /**
+ * Constructs an instance of {@link CreateFluoPcj}.
+ *
+ * @param spInsertBatchSize - The maximum number of binding sets that will be
+ * inserted into each Statement Pattern's result set per Fluo transaction.
+ */
+ public CreateFluoPcj(final int spInsertBatchSize) {
+ checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
+ this.spInsertBatchSize = spInsertBatchSize;
+ }
+
+
+ /**
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
+ * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
+ * inside of Fluo. This method assumes that the user will export the results to Kafka or
+ * some other external resource. The export id is equivalent to the queryId that is returned,
+ * which is in contrast to the other createPcj methods in this class which accept an external pcjId
+ * that is used to identify the Accumulo table or Kafka topic for exporting results.
+ *
+ * @param sparql - sparql query String to be registered with Fluo
+ * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+ * @return The metadata that was written to the Fluo application for the PCJ.
+ * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+ * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ */
+ public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException {
+ Preconditions.checkNotNull(sparql);
+ Preconditions.checkNotNull(fluo);
+
+ String pcjId = UUID.randomUUID().toString().replaceAll("-", "");
+ return createPcj(pcjId, sparql, fluo);
+ }
+
+ /**
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides
+ * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
+ * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated
+ * inside of Fluo. This method assumes that the user will export the results to Kafka or
+ * some other external resource.
+ *
+ * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+ * @param sparql - sparql query String to be registered with Fluo
+ * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+ * @return The metadata that was written to the Fluo application for the PCJ.
+ * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ */
+ public FluoQuery createPcj(
+ final String pcjId,
+ final String sparql,
+ final FluoClient fluo) throws MalformedQueryException {
+ requireNonNull(pcjId);
+ requireNonNull(sparql);
+ requireNonNull(fluo);
+
+ FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId);
+ writeFluoQuery(fluo, fluoQuery, pcjId);
+
+ return fluoQuery;
+ }
+
+ private FluoQuery makeFluoQuery(String sparql, String pcjId) throws MalformedQueryException {
+
+ String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
+
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
+ builder.setFluoQueryId(queryId);
+ builder.setSparql(sparql);
+
+ return builder.build();
+ }
+
+ private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
+ try (Transaction tx = fluo.newTransaction()) {
+ // Write the query's structure to Fluo.
+ new FluoQueryMetadataDAO().write(tx, fluoQuery);
+
+ // Flush the changes to Fluo.
+ tx.commit();
+ }
+ }
+
+
+ /**
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an
+ * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
+ *
+ * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+ * @param pcjStorage - Provides access to the PCJ index. (not null)
+ * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+ * @return The metadata that was written to the Fluo application for the PCJ.
+ * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+ * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ */
+ public FluoQuery createPcj(
+ final String pcjId,
+ final PrecomputedJoinStorage pcjStorage,
+ final FluoClient fluo) throws MalformedQueryException, PcjException {
+ requireNonNull(pcjId);
+ requireNonNull(pcjStorage);
+ requireNonNull(fluo);
+
+ // Parse the query's structure for the metadata that will be written to fluo.
+ final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = pcjMetadata.getSparql();
+ return createPcj(pcjId, sparql, fluo);
+ }
+
+ /**
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+ * <p>
+ * This call scans Rya for Statement Pattern matches and inserts them into
+ * the Fluo application. This method does not verify that a PcjTable with the
+ * the given pcjId actually exists. It is assumed that results for any query registered
+ * using this method will be exported to Kafka or some other external service.
+ *
+ * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+ * @param sparql - sparql query that will registered with Fluo. (not null)
+ * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+ * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+ * @return The Fluo application's Query ID of the query that was created.
+ * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+ * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+ */
+ public String withRyaIntegration(
+ final String pcjId,
+ final String sparql,
+ final FluoClient fluo,
+ final Connector accumulo,
+ final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+ requireNonNull(pcjId);
+ requireNonNull(sparql);
+ requireNonNull(fluo);
+ requireNonNull(accumulo);
+ requireNonNull(ryaInstance);
+
+
+ // Write the SPARQL query's structure to the Fluo Application.
+ final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
+ //import results already ingested into Rya that match query
+ importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
+ // return queryId to the caller for later monitoring from the export.
+ return fluoQuery.getQueryMetadata().getNodeId();
+ }
+
+
+ /**
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+ * <p>
+ * This call scans Rya for Statement Pattern matches and inserts them into
+ * the Fluo application. The Fluo application will then maintain the intermediate
+ * results as new triples are inserted and export any new query results to the
+ * {@code pcjId} within the provided {@code pcjStorage}. This method requires that a
+ * PCJ table already exist for the query corresponding to the pcjId. Results will be exported
+ * to this table.
+ *
+ * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+ * @param pcjStorage - Provides access to the PCJ index. (not null)
+ * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+ * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+ * @return The Fluo application's Query ID of the query that was created.
+ * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+ * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+ */
+ public String withRyaIntegration(
+ final String pcjId,
+ final PrecomputedJoinStorage pcjStorage,
+ final FluoClient fluo,
+ final Connector accumulo,
+ final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+ requireNonNull(pcjId);
+ requireNonNull(pcjStorage);
+ requireNonNull(fluo);
+ requireNonNull(accumulo);
+ requireNonNull(ryaInstance);
+
+ // Parse the query's structure for the metadata that will be written to fluo.
+ final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = pcjMetadata.getSparql();
+
+ return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
+ }
+
+ private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
+ throws RyaDAOException {
+ // Reuse the same set object while performing batch inserts.
+ final Set<RyaStatement> queryBatch = new HashSet<>();
+
+ // Iterate through each of the statement patterns and insert their
+ // historic matches into Fluo.
+ for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) {
+ // Get an iterator over all of the binding sets that match the
+ // statement pattern.
+ final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
+ queryBatch.add(spToRyaStatement(pattern));
+ }
+
+ // Create AccumuloRyaQueryEngine to query for historic results
+ final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
+ conf.setTablePrefix(ryaInstance);
+ conf.setAuths(getAuths(accumulo));
+
+ try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf);
+ CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) {
+ final Set<RyaStatement> triplesBatch = new HashSet<>();
+
+ // Insert batches of the binding sets into Fluo.
+ for (final RyaStatement ryaStatement : queryIterable) {
+ if (triplesBatch.size() == spInsertBatchSize) {
+ writeBatch(fluo, triplesBatch);
+ triplesBatch.clear();
+ }
+
+ triplesBatch.add(ryaStatement);
+ }
+
+ if (!triplesBatch.isEmpty()) {
+ writeBatch(fluo, triplesBatch);
+ triplesBatch.clear();
+ }
+ } catch (final IOException e) {
+ log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
+ }
+ }
+
+ private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {
+ checkNotNull(fluo);
+ checkNotNull(batch);
+ new InsertTriples().insert(fluo, batch);
+ }
+
+ private static RyaStatement spToRyaStatement(final StatementPattern sp) {
+ final Value subjVal = sp.getSubjectVar().getValue();
+ final Value predVal = sp.getPredicateVar().getValue();
+ final Value objVal = sp.getObjectVar().getValue();
+
+ RyaURI subjURI = null;
+ RyaURI predURI = null;
+ RyaType objType = null;
+
+ if(subjVal != null) {
+ if(!(subjVal instanceof Resource)) {
+ throw new AssertionError("Subject must be a Resource.");
+ }
+ subjURI = RdfToRyaConversions.convertResource((Resource) subjVal);
+ }
+
+ if (predVal != null) {
+ if(!(predVal instanceof URI)) {
+ throw new AssertionError("Predicate must be a URI.");
+ }
+ predURI = RdfToRyaConversions.convertURI((URI) predVal);
+ }
+
+ if (objVal != null ) {
+ objType = RdfToRyaConversions.convertValue(objVal);
+ }
+
+ return new RyaStatement(subjURI, predURI, objType);
+ }
+
+ private String[] getAuths(final Connector accumulo) {
+ Authorizations auths;
+ try {
+ auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
+ final List<byte[]> authList = auths.getAuthorizations();
+ final String[] authArray = new String[authList.size()];
+ for(int i = 0; i < authList.size(); i++){
+ authArray[i] = new String(authList.get(i), "UTF-8");
+ }
+ return authArray;
+ } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) {
+ throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
deleted file mode 100644
index 767d9d2..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.security.Authorizations;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.AccumuloRdfConfiguration;
-import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
-import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaURI;
-import org.apache.rya.api.persist.RyaDAOException;
-import org.apache.rya.api.persist.query.BatchRyaQuery;
-import org.apache.rya.api.resolver.RdfToRyaConversions;
-import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
-import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds;
-import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
-import org.apache.rya.indexing.pcj.storage.PcjException;
-import org.apache.rya.indexing.pcj.storage.PcjMetadata;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.calrissian.mango.collect.CloseableIterable;
-import org.openrdf.model.Resource;
-import org.openrdf.model.URI;
-import org.openrdf.model.Value;
-import org.openrdf.query.MalformedQueryException;
-import org.openrdf.query.algebra.StatementPattern;
-import org.openrdf.query.parser.ParsedQuery;
-import org.openrdf.query.parser.sparql.SPARQLParser;
-
-import com.google.common.base.Preconditions;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query.
- * <p>
- * This is a two phase process.
- * <ol>
- * <li>Setup metadata about each node of the query using a single Fluo transaction. </li>
- * <li>Scan Rya for binding sets that match each Statement Pattern from the query
- * and use a separate Fluo transaction for each batch that is inserted. This
- * ensure historic triples will be included in the query's results.</li>
- * </ol>
- * After the first step is finished, any new Triples that are added to the Fluo
- * application will be matched against statement patterns, the final results
- * will percolate to the top of the query, and those results will be exported to
- * Rya's query system.
- */
-@DefaultAnnotation(NonNull.class)
-public class CreatePcj {
- private static final Logger log = Logger.getLogger(CreatePcj.class);
-
- /**
- * The default Statement Pattern batch insert size is 1000.
- */
- private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000;
-
- /**
- * The maximum number of binding sets that will be inserted into each Statement
- * Pattern's result set per Fluo transaction.
- */
- private final int spInsertBatchSize;
-
- /**
- * Constructs an instance of {@link CreatePcj} that uses
- * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size.
- */
- public CreatePcj() {
- this(DEFAULT_SP_INSERT_BATCH_SIZE);
- }
-
- /**
- * Constructs an instance of {@link CreatePcj}.
- *
- * @param spInsertBatchSize - The maximum number of binding sets that will be
- * inserted into each Statement Pattern's result set per Fluo transaction.
- */
- public CreatePcj(final int spInsertBatchSize) {
- checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0.");
- this.spInsertBatchSize = spInsertBatchSize;
- }
-
-
- /**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method does not
- * require a pcjId and does not require a PCJ table to have already been created via {@link PrecomputedJoinStorage}.
- * This method only adds the metadata to the Fluo table to incrementally generate query results. Since there
- * is no PCJ table, the incremental results must be exported to some external queuing service such as Kafka.
- * This method currently only supports SPARQL COSNTRUCT queries, as they only export to Kafka by default.
- *
- * @param sparql - SPARQL query whose results will be updated in the Fluo table
- * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @return The metadata that was written to the Fluo application for the PCJ.
- * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
- * @throws RuntimeException If SPARQL query is not a CONSTRUCT query.
- */
- public FluoQuery createFluoPcj(final FluoClient fluo, String sparql) throws MalformedQueryException, PcjException {
- requireNonNull(sparql);
- requireNonNull(fluo);
-
- // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo.
- // We use these IDs later when scanning Rya for historic Statement Pattern matches
- // as well as setting up automatic exports.
- final NodeIds nodeIds = new NodeIds();
- final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
- final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
- checkArgument(fluoQuery.getConstructQueryMetadata().isPresent(), "Sparql query: " + sparql + " must begin with a construct.");
-
- try (Transaction tx = fluo.newTransaction()) {
- // Write the query's structure to Fluo.
- new FluoQueryMetadataDAO().write(tx, fluoQuery);
- tx.commit();
- }
-
- return fluoQuery;
- }
-
-
-
-
- /**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
- * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
- * inside of Fluo. This method assumes that the user will export the results to Kafka or
- * some other external resource. The export id is equivalent to the queryId that is returned,
- * which is in contrast to the other createPcj methods in this class which accept an external pcjId
- * that is used to identify the Accumulo table or Kafka topic for exporting results.
- *
- * @param sparql - sparql query String to be registered with Fluo
- * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @return queryId - The id of the root of the query metadata tree in Fluo
- * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
- */
- public String createPcj(String sparql, FluoClient fluo) throws MalformedQueryException {
- Preconditions.checkNotNull(sparql);
- Preconditions.checkNotNull(fluo);
-
- FluoQuery fluoQuery = makeFluoQuery(sparql);
- String queryId = null;
- if(fluoQuery.getQueryMetadata().isPresent()) {
- queryId = fluoQuery.getQueryMetadata().get().getNodeId();
- queryId = queryId.split(IncrementalUpdateConstants.QUERY_PREFIX)[1];
- } else {
- queryId = fluoQuery.getConstructQueryMetadata().get().getNodeId();
- queryId = queryId.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1];
- }
-
- String[] idArray = queryId.split("_");
- String id = idArray[idArray.length - 1];
-
- writeFluoQuery(fluo, fluoQuery, id);
- return id;
- }
-
- /**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides
- * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
- * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated
- * inside of Fluo. This method assumes that the user will export the results to Kafka or
- * some other external resource.
- *
- * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
- * @param sparql - sparql query String to be registered with Fluo
- * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @return The metadata that was written to the Fluo application for the PCJ.
- * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
- */
- public FluoQuery createPcj(
- final String pcjId,
- final String sparql,
- final FluoClient fluo) throws MalformedQueryException, PcjException {
- requireNonNull(pcjId);
- requireNonNull(sparql);
- requireNonNull(fluo);
-
- FluoQuery fluoQuery = makeFluoQuery(sparql);
- writeFluoQuery(fluo, fluoQuery, pcjId);
-
- return fluoQuery;
- }
-
- private FluoQuery makeFluoQuery(String sparql) throws MalformedQueryException {
-
- // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo.
- // We use these IDs later when scanning Rya for historic Statement Pattern matches
- // as well as setting up automatic exports.
- final NodeIds nodeIds = new NodeIds();
-
- // Parse the query's structure for the metadata that will be written to fluo.
- final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
- return new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
- }
-
- private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
- try (Transaction tx = fluo.newTransaction()) {
- // Write the query's structure to Fluo.
- new FluoQueryMetadataDAO().write(tx, fluoQuery);
-
- // The results of the query are eventually exported to an instance
- // of Rya, so store the Rya ID for the PCJ.
- QueryMetadata metadata = fluoQuery.getQueryMetadata().orNull();
- if (metadata != null) {
- String queryId = metadata.getNodeId();
- tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
- tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
- }
-
- // Flush the changes to Fluo.
- tx.commit();
- }
- }
-
-
- /**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an
- * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
- *
- * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
- * @param pcjStorage - Provides access to the PCJ index. (not null)
- * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @return The metadata that was written to the Fluo application for the PCJ.
- * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
- */
- public FluoQuery createPcj(
- final String pcjId,
- final PrecomputedJoinStorage pcjStorage,
- final FluoClient fluo) throws MalformedQueryException, PcjException {
- requireNonNull(pcjId);
- requireNonNull(pcjStorage);
- requireNonNull(fluo);
-
- // Parse the query's structure for the metadata that will be written to fluo.
- final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
- final String sparql = pcjMetadata.getSparql();
- return createPcj(pcjId, sparql, fluo);
- }
-
- /**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ.
- * <p>
- * This call scans Rya for Statement Pattern matches and inserts them into
- * the Fluo application. This method does not verify that a PcjTable with the
- * the given pcjId actually exists. It is assumed that results for any query registered
- * using this method will be exported to Kafka or some other external service.
- *
- * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
- * @param sparql - sparql query that will registered with Fluo. (not null)
- * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
- * @return The Fluo application's Query ID of the query that was created.
- * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
- * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
- */
- public String withRyaIntegration(
- final String pcjId,
- final String sparql,
- final FluoClient fluo,
- final Connector accumulo,
- final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
- requireNonNull(pcjId);
- requireNonNull(sparql);
- requireNonNull(fluo);
- requireNonNull(accumulo);
- requireNonNull(ryaInstance);
-
-
- // Write the SPARQL query's structure to the Fluo Application.
- final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
- //import results already ingested into Rya that match query
- importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
- // return queryId to the caller for later monitoring from the export.
- return fluoQuery.getQueryMetadata().get().getNodeId();
- }
-
-
- /**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ.
- * <p>
- * This call scans Rya for Statement Pattern matches and inserts them into
- * the Fluo application. The Fluo application will then maintain the intermediate
- * results as new triples are inserted and export any new query results to the
- * {@code pcjId} within the provided {@code pcjStorage}. This method requires that a
- * PCJ table already exist for the query corresponding to the pcjId. Results will be exported
- * to this table.
- *
- * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
- * @param pcjStorage - Provides access to the PCJ index. (not null)
- * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
- * @return The Fluo application's Query ID of the query that was created.
- * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
- * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
- */
- public String withRyaIntegration(
- final String pcjId,
- final PrecomputedJoinStorage pcjStorage,
- final FluoClient fluo,
- final Connector accumulo,
- final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
- requireNonNull(pcjId);
- requireNonNull(pcjStorage);
- requireNonNull(fluo);
- requireNonNull(accumulo);
- requireNonNull(ryaInstance);
-
- // Parse the query's structure for the metadata that will be written to fluo.
- final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
- final String sparql = pcjMetadata.getSparql();
-
- return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
- }
-
- private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
- throws RyaDAOException {
- // Reuse the same set object while performing batch inserts.
- final Set<RyaStatement> queryBatch = new HashSet<>();
-
- // Iterate through each of the statement patterns and insert their
- // historic matches into Fluo.
- for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) {
- // Get an iterator over all of the binding sets that match the
- // statement pattern.
- final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern());
- queryBatch.add(spToRyaStatement(pattern));
- }
-
- // Create AccumuloRyaQueryEngine to query for historic results
- final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
- conf.setTablePrefix(ryaInstance);
- conf.setAuths(getAuths(accumulo));
-
- try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf);
- CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) {
- final Set<RyaStatement> triplesBatch = new HashSet<>();
-
- // Insert batches of the binding sets into Fluo.
- for (final RyaStatement ryaStatement : queryIterable) {
- if (triplesBatch.size() == spInsertBatchSize) {
- writeBatch(fluo, triplesBatch);
- triplesBatch.clear();
- }
-
- triplesBatch.add(ryaStatement);
- }
-
- if (!triplesBatch.isEmpty()) {
- writeBatch(fluo, triplesBatch);
- triplesBatch.clear();
- }
- } catch (final IOException e) {
- log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e);
- }
- }
-
- private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) {
- checkNotNull(fluo);
- checkNotNull(batch);
- new InsertTriples().insert(fluo, batch);
- }
-
- private static RyaStatement spToRyaStatement(final StatementPattern sp) {
- final Value subjVal = sp.getSubjectVar().getValue();
- final Value predVal = sp.getPredicateVar().getValue();
- final Value objVal = sp.getObjectVar().getValue();
-
- RyaURI subjURI = null;
- RyaURI predURI = null;
- RyaType objType = null;
-
- if(subjVal != null) {
- if(!(subjVal instanceof Resource)) {
- throw new AssertionError("Subject must be a Resource.");
- }
- subjURI = RdfToRyaConversions.convertResource((Resource) subjVal);
- }
-
- if (predVal != null) {
- if(!(predVal instanceof URI)) {
- throw new AssertionError("Predicate must be a URI.");
- }
- predURI = RdfToRyaConversions.convertURI((URI) predVal);
- }
-
- if (objVal != null ) {
- objType = RdfToRyaConversions.convertValue(objVal);
- }
-
- return new RyaStatement(subjURI, predURI, objType);
- }
-
- private String[] getAuths(final Connector accumulo) {
- Authorizations auths;
- try {
- auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami());
- final List<byte[]> authList = auths.getAuthorizations();
- final String[] authArray = new String[authList.size()];
- for(int i = 0; i < authList.size(); i++){
- authArray[i] = new String(authList.get(i), "UTF-8");
- }
- return authArray;
- } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) {
- throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
new file mode 100644
index 0000000..58a52fb
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.client.scanner.CellScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.RowColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.BindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Deletes a Pre-computed Join (PCJ) from Fluo.
+ * <p>
+ * This is a two phase process.
+ * <ol>
+ * <li>Delete metadata about each node of the query using a single Fluo
+ * transaction. This prevents new {@link BindingSet}s from being created when
+ * new triples are inserted.</li>
+ * <li>Delete BindingSets associated with each node of the query. This is done
+ * in a batch fashion to guard against large delete transactions that don't fit
+ * into memory.</li>
+ * </ol>
+ */
+@DefaultAnnotation(NonNull.class)
+public class DeleteFluoPcj {
+
+ private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+ private final int batchSize;
+
+ /**
+ * Constructs an instance of {@link DeleteFluoPcj}.
+ *
+ * @param batchSize - The number of entries that will be deleted at a time. (> 0)
+ */
+ public DeleteFluoPcj(final int batchSize) {
+ checkArgument(batchSize > 0);
+ this.batchSize = batchSize;
+ }
+
+ /**
+ * Deletes all metadata and {@link BindingSet}s associated with a Rya
+ * Precomputed Join Index from the Fluo application that is incrementally
+ * updating it.
+ *
+ * @param client - Connects to the Fluo application that is updating the PCJ
+ * Index. (not null)
+ * @param pcjId - The PCJ ID for the query that will removed from the Fluo
+ * application. (not null)
+ */
+ public void deletePcj(final FluoClient client, final String pcjId) {
+ requireNonNull(client);
+ requireNonNull(pcjId);
+
+ final Transaction tx = client.newTransaction();
+
+ // Delete the query's metadata. This halts input.
+ final List<String> nodeIds = getNodeIds(tx, pcjId);
+ deleteMetadata(tx, nodeIds, pcjId);
+
+ // Delete the binding sets associated with the query's nodes.
+ for (final String nodeId : nodeIds) {
+ deleteData(client, nodeId);
+ }
+ }
+
+ /**
+ * This method retrieves all of the nodeIds that are part of the query with
+ * specified pcjId.
+ *
+ * @param tx - Transaction of a given Fluo table. (not null)
+ * @param pcjId - Id of query. (not null)
+ * @return list of Node IDs associated with the query {@code pcjId}.
+ */
+ private List<String> getNodeIds(Transaction tx, String pcjId) {
+ requireNonNull(tx);
+ requireNonNull(pcjId);
+
+ // Get the ID that tracks the query within the Fluo application.
+ final String queryId = getQueryIdFromPcjId(tx, pcjId);
+
+ // Get the query's children nodes.
+ final List<String> nodeIds = new ArrayList<>();
+ nodeIds.add(queryId);
+ getChildNodeIds(tx, queryId, nodeIds);
+ return nodeIds;
+ }
+
+ /**
+ * Recursively navigate query tree to extract all of the nodeIds.
+ *
+ * @param tx - Transaction of a given Fluo table. (not null)
+ * @param nodeId - Current node in query tree. (not null)
+ * @param nodeIds - The Node IDs extracted from query tree. (not null)
+ */
+ private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) {
+ requireNonNull(tx);
+ requireNonNull(nodeId);
+ requireNonNull(nodeIds);
+
+ final NodeType type = NodeType.fromNodeId(nodeId).get();
+ switch (type) {
+ case QUERY:
+ final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId);
+ final String queryChild = queryMeta.getChildNodeId();
+ nodeIds.add(queryChild);
+ getChildNodeIds(tx, queryChild, nodeIds);
+ break;
+ case CONSTRUCT:
+ final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
+ final String constructChild = constructMeta.getChildNodeId();
+ nodeIds.add(constructChild);
+ getChildNodeIds(tx, constructChild, nodeIds);
+ break;
+ case JOIN:
+ final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
+ final String lchild = joinMeta.getLeftChildNodeId();
+ final String rchild = joinMeta.getRightChildNodeId();
+ nodeIds.add(lchild);
+ nodeIds.add(rchild);
+ getChildNodeIds(tx, lchild, nodeIds);
+ getChildNodeIds(tx, rchild, nodeIds);
+ break;
+ case FILTER:
+ final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId);
+ final String filterChild = filterMeta.getChildNodeId();
+ nodeIds.add(filterChild);
+ getChildNodeIds(tx, filterChild, nodeIds);
+ break;
+ case AGGREGATION:
+ final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId);
+ final String aggChild = aggMeta.getChildNodeId();
+ nodeIds.add(aggChild);
+ getChildNodeIds(tx, aggChild, nodeIds);
+ break;
+ case PERIODIC_QUERY:
+ final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId);
+ final String periodicChild = periodicMeta.getChildNodeId();
+ nodeIds.add(periodicChild);
+ getChildNodeIds(tx, periodicChild, nodeIds);
+ break;
+ case PROJECTION:
+ final ProjectionMetadata projectionMetadata = dao.readProjectionMetadata(tx, nodeId);
+ final String projectionChild = projectionMetadata.getChildNodeId();
+ nodeIds.add(projectionChild);
+ getChildNodeIds(tx, projectionChild, nodeIds);
+ break;
+ case STATEMENT_PATTERN:
+ break;
+ }
+ }
+
+ /**
+ * Deletes metadata for all nodeIds associated with a given queryId in a
+ * single transaction. Prevents additional BindingSets from being created as
+ * new triples are added.
+ *
+ * @param tx - Transaction of a given Fluo table. (not null)
+ * @param nodeIds - Nodes whose metatdata will be deleted. (not null)
+ * @param pcjId - The PCJ ID of the query whose will be deleted. (not null)
+ */
+ private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) {
+ requireNonNull(tx);
+ requireNonNull(nodeIds);
+ requireNonNull(pcjId);
+
+ try (final Transaction typeTx = tx) {
+ deletePcjIdAndSparqlMetadata(typeTx, pcjId);
+
+ for (final String nodeId : nodeIds) {
+ final NodeType type = NodeType.fromNodeId(nodeId).get();
+ deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
+ }
+ typeTx.commit();
+ }
+ }
+
+ /**
+ * Deletes all metadata for a Query Node.
+ *
+ * @param tx - Transaction the deletes will be performed with. (not null)
+ * @param nodeId - The Node ID of the query node to delete. (not null)
+ * @param columns - The columns that will be deleted. (not null)
+ */
+ private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) {
+ requireNonNull(tx);
+ requireNonNull(columns);
+ requireNonNull(nodeId);
+
+ final Bytes row = Bytes.of(nodeId);
+ for (final Column column : columns) {
+ tx.delete(row, column);
+ }
+ }
+
+ /**
+ * Deletes high level query meta for converting from queryId to pcjId and
+ * vice versa, as well as converting from sparql to queryId.
+ *
+ * @param tx - Transaction the deletes will be performed with. (not null)
+ * @param pcjId - The PCJ whose metadata will be deleted. (not null)
+ */
+ private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
+ requireNonNull(tx);
+ requireNonNull(pcjId);
+
+ final String queryId = getQueryIdFromPcjId(tx, pcjId);
+ final String sparql = getSparqlFromQueryId(tx, queryId);
+ tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
+ tx.delete(sparql, FluoQueryColumns.QUERY_ID);
+ tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
+ }
+
+ /**
+ * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
+ *
+ * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
+ * @param client - Used to delete the data. (not null)
+ */
+ private void deleteData(final FluoClient client, final String nodeId) {
+ requireNonNull(client);
+ requireNonNull(nodeId);
+
+ final NodeType type = NodeType.fromNodeId(nodeId).get();
+ Transaction tx = client.newTransaction();
+ while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
+ tx = client.newTransaction();
+ }
+ }
+
+ private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) {
+ requireNonNull(tx);
+ requireNonNull(nodeId);
+ requireNonNull(column);
+
+ return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build();
+ }
+
+ private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) {
+ requireNonNull(tx);
+ requireNonNull(scanner);
+ requireNonNull(column);
+
+ try (Transaction ntx = tx) {
+ int count = 0;
+ final Iterator<RowColumnValue> iter = scanner.iterator();
+ while (iter.hasNext() && count < batchSize) {
+ final Bytes row = iter.next().getRow();
+ count++;
+ tx.delete(row, column);
+ }
+
+ final boolean hasNext = iter.hasNext();
+ tx.commit();
+ return hasNext;
+ }
+ }
+
+ private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
+ requireNonNull(tx);
+ requireNonNull(pcjId);
+
+ final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
+ return queryIdBytes.toString();
+ }
+
+ private String getSparqlFromQueryId(final Transaction tx, final String queryId) {
+ requireNonNull(tx);
+ requireNonNull(queryId);
+
+ final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
+ return metadata.getSparql();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
deleted file mode 100644
index 3052c1d..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.api;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static java.util.Objects.requireNonNull;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Transaction;
-import org.apache.fluo.api.client.scanner.CellScanner;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.RowColumnValue;
-import org.apache.fluo.api.data.Span;
-import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
-import org.openrdf.query.BindingSet;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-/**
- * Deletes a Pre-computed Join (PCJ) from Fluo.
- * <p>
- * This is a two phase process.
- * <ol>
- * <li>Delete metadata about each node of the query using a single Fluo
- * transaction. This prevents new {@link BindingSet}s from being created when
- * new triples are inserted.</li>
- * <li>Delete BindingSets associated with each node of the query. This is done
- * in a batch fashion to guard against large delete transactions that don't fit
- * into memory.</li>
- * </ol>
- */
-@DefaultAnnotation(NonNull.class)
-public class DeletePcj {
-
- private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
- private final int batchSize;
-
- /**
- * Constructs an instance of {@link DeletePcj}.
- *
- * @param batchSize - The number of entries that will be deleted at a time. (> 0)
- */
- public DeletePcj(final int batchSize) {
- checkArgument(batchSize > 0);
- this.batchSize = batchSize;
- }
-
- /**
- * Deletes all metadata and {@link BindingSet}s associated with a Rya
- * Precomputed Join Index from the Fluo application that is incrementally
- * updating it.
- *
- * @param client - Connects to the Fluo application that is updating the PCJ
- * Index. (not null)
- * @param pcjId - The PCJ ID for the query that will removed from the Fluo
- * application. (not null)
- */
- public void deletePcj(final FluoClient client, final String pcjId) {
- requireNonNull(client);
- requireNonNull(pcjId);
-
- final Transaction tx = client.newTransaction();
-
- // Delete the query's metadata. This halts input.
- final List<String> nodeIds = getNodeIds(tx, pcjId);
- deleteMetadata(tx, nodeIds, pcjId);
-
- // Delete the binding sets associated with the query's nodes.
- for (final String nodeId : nodeIds) {
- deleteData(client, nodeId);
- }
- }
-
- /**
- * This method retrieves all of the nodeIds that are part of the query with
- * specified pcjId.
- *
- * @param tx - Transaction of a given Fluo table. (not null)
- * @param pcjId - Id of query. (not null)
- * @return list of Node IDs associated with the query {@code pcjId}.
- */
- private List<String> getNodeIds(Transaction tx, String pcjId) {
- requireNonNull(tx);
- requireNonNull(pcjId);
-
- // Get the ID that tracks the query within the Fluo application.
- final String queryId = getQueryIdFromPcjId(tx, pcjId);
-
- // Get the query's children nodes.
- final List<String> nodeIds = new ArrayList<>();
- nodeIds.add(queryId);
- getChildNodeIds(tx, queryId, nodeIds);
- return nodeIds;
- }
-
- /**
- * Recursively navigate query tree to extract all of the nodeIds.
- *
- * @param tx - Transaction of a given Fluo table. (not null)
- * @param nodeId - Current node in query tree. (not null)
- * @param nodeIds - The Node IDs extracted from query tree. (not null)
- */
- private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) {
- requireNonNull(tx);
- requireNonNull(nodeId);
- requireNonNull(nodeIds);
-
- final NodeType type = NodeType.fromNodeId(nodeId).get();
- switch (type) {
- case QUERY:
- final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId);
- final String queryChild = queryMeta.getChildNodeId();
- nodeIds.add(queryChild);
- getChildNodeIds(tx, queryChild, nodeIds);
- break;
- case CONSTRUCT:
- final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
- final String constructChild = constructMeta.getChildNodeId();
- nodeIds.add(constructChild);
- getChildNodeIds(tx, constructChild, nodeIds);
- break;
- case JOIN:
- final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
- final String lchild = joinMeta.getLeftChildNodeId();
- final String rchild = joinMeta.getRightChildNodeId();
- nodeIds.add(lchild);
- nodeIds.add(rchild);
- getChildNodeIds(tx, lchild, nodeIds);
- getChildNodeIds(tx, rchild, nodeIds);
- break;
- case FILTER:
- final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId);
- final String filterChild = filterMeta.getChildNodeId();
- nodeIds.add(filterChild);
- getChildNodeIds(tx, filterChild, nodeIds);
- break;
- case AGGREGATION:
- final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId);
- final String aggChild = aggMeta.getChildNodeId();
- nodeIds.add(aggChild);
- getChildNodeIds(tx, aggChild, nodeIds);
- break;
- case PERIODIC_QUERY:
- final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId);
- final String periodicChild = periodicMeta.getChildNodeId();
- nodeIds.add(periodicChild);
- getChildNodeIds(tx, periodicChild, nodeIds);
- break;
- case STATEMENT_PATTERN:
- break;
- }
- }
-
- /**
- * Deletes metadata for all nodeIds associated with a given queryId in a
- * single transaction. Prevents additional BindingSets from being created as
- * new triples are added.
- *
- * @param tx - Transaction of a given Fluo table. (not null)
- * @param nodeIds - Nodes whose metatdata will be deleted. (not null)
- * @param pcjId - The PCJ ID of the query whose will be deleted. (not null)
- */
- private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) {
- requireNonNull(tx);
- requireNonNull(nodeIds);
- requireNonNull(pcjId);
-
- try (final Transaction typeTx = tx) {
- deletePcjIdAndSparqlMetadata(typeTx, pcjId);
-
- for (final String nodeId : nodeIds) {
- final NodeType type = NodeType.fromNodeId(nodeId).get();
- deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
- }
- typeTx.commit();
- }
- }
-
- /**
- * Deletes all metadata for a Query Node.
- *
- * @param tx - Transaction the deletes will be performed with. (not null)
- * @param nodeId - The Node ID of the query node to delete. (not null)
- * @param columns - The columns that will be deleted. (not null)
- */
- private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) {
- requireNonNull(tx);
- requireNonNull(columns);
- requireNonNull(nodeId);
-
- final Bytes row = Bytes.of(nodeId);
- for (final Column column : columns) {
- tx.delete(row, column);
- }
- }
-
- /**
- * Deletes high level query meta for converting from queryId to pcjId and
- * vice versa, as well as converting from sparql to queryId.
- *
- * @param tx - Transaction the deletes will be performed with. (not null)
- * @param pcjId - The PCJ whose metadata will be deleted. (not null)
- */
- private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
- requireNonNull(tx);
- requireNonNull(pcjId);
-
- final String queryId = getQueryIdFromPcjId(tx, pcjId);
- final String sparql = getSparqlFromQueryId(tx, queryId);
- tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
- tx.delete(sparql, FluoQueryColumns.QUERY_ID);
- tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
- }
-
- /**
- * Deletes all results (BindingSets or Statements) associated with the specified nodeId.
- *
- * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
- * @param client - Used to delete the data. (not null)
- */
- private void deleteData(final FluoClient client, final String nodeId) {
- requireNonNull(client);
- requireNonNull(nodeId);
-
- final NodeType type = NodeType.fromNodeId(nodeId).get();
- Transaction tx = client.newTransaction();
- while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) {
- tx = client.newTransaction();
- }
- }
-
- private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) {
- requireNonNull(tx);
- requireNonNull(nodeId);
- requireNonNull(column);
-
- return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build();
- }
-
- private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) {
- requireNonNull(tx);
- requireNonNull(scanner);
- requireNonNull(column);
-
- try (Transaction ntx = tx) {
- int count = 0;
- final Iterator<RowColumnValue> iter = scanner.iterator();
- while (iter.hasNext() && count < batchSize) {
- final Bytes row = iter.next().getRow();
- count++;
- tx.delete(row, column);
- }
-
- final boolean hasNext = iter.hasNext();
- tx.commit();
- return hasNext;
- }
- }
-
- private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
- requireNonNull(tx);
- requireNonNull(pcjId);
-
- final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
- return queryIdBytes.toString();
- }
-
- private String getSparqlFromQueryId(final Transaction tx, final String queryId) {
- requireNonNull(tx);
- requireNonNull(queryId);
-
- final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
- return metadata.getSparql();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
index 6c1aa01..76b62d8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java
@@ -19,12 +19,8 @@ package org.apache.rya.indexing.pcj.fluo.app;
* under the License.
*/
import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.UUID;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
index d8d60b5..6642780 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java
@@ -23,12 +23,13 @@ import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.log4j.Logger;
-import org.apache.rya.api.domain.RyaSchema;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaSubGraph;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
/**
@@ -53,39 +54,26 @@ public class ConstructQueryResultUpdater {
public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) {
String nodeId = metadata.getNodeId();
+ VariableOrder varOrder = metadata.getVariableOrder();
Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS;
ConstructGraph graph = metadata.getConstructGraph();
+ String parentId = metadata.getParentNodeId();
- try {
+ // Create the Row Key for the emitted binding set. It does not contain visibilities.
+ final Bytes resultRow = RowKeyUtil.makeRowKey(nodeId, varOrder, bs);
+
+ // If this is a new binding set, then emit it.
+ if(tx.get(resultRow, column) == null || varOrder.getVariableOrders().size() < bs.size()) {
Set<RyaStatement> statements = graph.createGraphFromBindingSet(bs);
- RyaSubGraph subgraph = new RyaSubGraph(metadata.getNodeId(), statements);
- String resultId = nodeId + "_" + getSubGraphId(subgraph);
- tx.set(Bytes.of(resultId), column, Bytes.of(serializer.toBytes(subgraph)));
- } catch (Exception e) {
- log.trace("Unable to serialize RyaStatement generated by ConstructGraph: " + graph + " from BindingSet: " + bs );
- }
- }
-
- /**
- * Generates a simple hash used as an id for the subgraph. Id generated as hash as opposed
- * to UUID to avoid the same subgraph result being stored under multiple UUID.
- * @param subgraph - subgraph that an id is need for
- * @return - hash of subgraph used as an id
- */
- private int getSubGraphId(RyaSubGraph subgraph) {
- int id = 17;
- id = 31*id + subgraph.getId().hashCode();
- for(RyaStatement statement: subgraph.getStatements()) {
- int statementId = 7;
- if(!statement.getSubject().getData().startsWith(RyaSchema.BNODE_NAMESPACE)) {
- statementId = 17*statementId + statement.getSubject().hashCode();
- }
- statementId = 17*statementId + statement.getPredicate().hashCode();
- statementId = 17*statementId + statement.getObject().hashCode();
- id += statementId;
+ RyaSubGraph subgraph = new RyaSubGraph(parentId, statements);
+ final Bytes nodeValueBytes = Bytes.of(serializer.toBytes(subgraph));
+
+ log.trace(
+ "Transaction ID: " + tx.getStartTimestamp() + "\n" +
+ "New Binding Set: " + subgraph + "\n");
+
+ tx.set(resultRow, column, nodeValueBytes);
}
- return Math.abs(id);
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 1c99051..7cfa216 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
@@ -46,8 +45,6 @@ import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil;
-import com.google.common.base.Optional;
-
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
import info.aduna.iteration.CloseableIteration;
@@ -114,24 +111,16 @@ public class FilterResultUpdater {
// Evaluate whether the child BindingSet satisfies the filter's condition.
final ValueExpr condition = filter.getCondition();
if (isTrue(condition, childBindingSet)) {
- // Create the Filter's binding set from the child's.
- final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
- final BindingSet filterBindingSet = BindingSetUtil.keepBindings(filterVarOrder, childBindingSet);
// Create the Row Key for the emitted binding set. It does not contain visibilities.
- final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, filterBindingSet);
-
- // If this is a new binding set, then emit it.
- if(tx.get(resultRow, FluoQueryColumns.FILTER_BINDING_SET) == null) {
- final VisibilityBindingSet visBindingSet = new VisibilityBindingSet(filterBindingSet, childBindingSet.getVisibility());
- final Bytes nodeValueBytes = BS_SERDE.serialize(visBindingSet);
+ final VariableOrder filterVarOrder = filterMetadata.getVariableOrder();
+ final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet);
- log.trace(
- "Transaction ID: " + tx.getStartTimestamp() + "\n" +
- "New Binding Set: " + visBindingSet + "\n");
+ // Serialize and emit BindingSet
+ final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
+ log.trace("Transaction ID: " + tx.getStartTimestamp() + "\n" + "New Binding Set: " + childBindingSet + "\n");
- tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes);
- }
+ tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
index 05a8d1c..43a36de 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java
@@ -23,8 +23,6 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE;
-import java.util.UUID;
-
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;