You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:36 UTC
[5/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes
#213.
RYA-246-Query-Export-Strategy. Closes #213.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/05147266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/05147266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/05147266
Branch: refs/heads/master
Commit: 0514726604757c03e66015edee742b0fbdcf1ca2
Parents: 82df3ad
Author: Caleb Meier <ca...@parsons.com>
Authored: Mon Aug 7 21:22:00 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Fri Aug 25 12:34:42 2017 -0700
----------------------------------------------------------------------
.../org/apache/rya/api/client/CreatePCJ.java | 37 ++++
.../api/client/accumulo/AccumuloCreatePCJ.java | 27 ++-
.../api/client/accumulo/AccumuloDeletePCJ.java | 6 +-
.../rya/api/client/accumulo/FluoITBase.java | 2 +-
.../src/main/java/RyaClientExample.java | 2 +-
.../storage/accumulo/ShiftVarOrderFactory.java | 1 +
.../indexing/pcj/fluo/api/CreateFluoPcj.java | 146 +++++++++----
.../indexing/pcj/fluo/api/DeleteFluoPcj.java | 127 +----------
.../indexing/pcj/fluo/api/GetPcjMetadata.java | 10 +-
.../indexing/pcj/fluo/api/GetQueryReport.java | 30 +--
.../rya/indexing/pcj/fluo/api/ListQueryIds.java | 2 +-
.../fluo/app/IncrementalUpdateConstants.java | 3 -
.../pcj/fluo/app/export/ExporterManager.java | 216 +++++++++++++++++++
.../export/IncrementalBindingSetExporter.java | 8 +-
.../IncrementalBindingSetExporterFactory.java | 104 ---------
.../app/export/IncrementalResultExporter.java | 42 ++++
.../IncrementalResultExporterFactory.java | 104 +++++++++
.../export/IncrementalRyaSubGraphExporter.java | 2 +-
.../IncrementalRyaSubGraphExporterFactory.java | 47 ----
.../pcj/fluo/app/export/NoOpExporter.java | 59 +++++
.../export/kafka/KafkaBindingSetExporter.java | 25 ++-
.../kafka/KafkaBindingSetExporterFactory.java | 13 +-
.../KafkaBindingSetExporterParameters.java | 80 +++++++
.../export/kafka/KafkaExportParameterBase.java | 86 ++++++++
.../app/export/kafka/KafkaExportParameters.java | 86 --------
.../export/kafka/KafkaRyaSubGraphExporter.java | 16 ++
.../kafka/KafkaRyaSubGraphExporterFactory.java | 17 +-
.../kafka/KafkaSubGraphExporterParameters.java | 81 +++++++
.../export/rya/PeriodicBindingSetExporter.java | 71 ++++++
.../rya/PeriodicBindingSetExporterFactory.java | 74 +++++++
.../app/export/rya/RyaBindingSetExporter.java | 43 ++--
.../rya/RyaBindingSetExporterFactory.java | 15 +-
.../app/export/rya/RyaExportParameters.java | 33 ++-
.../export/rya/RyaSubGraphExportParameters.java | 120 +++++++++++
.../app/export/rya/RyaSubGraphExporter.java | 106 +++++++++
.../export/rya/RyaSubGraphExporterFactory.java | 58 +++++
.../fluo/app/observers/AggregationObserver.java | 3 -
.../fluo/app/observers/BindingSetUpdater.java | 2 +-
.../observers/ConstructQueryResultObserver.java | 167 ++------------
.../pcj/fluo/app/observers/FilterObserver.java | 3 -
.../pcj/fluo/app/observers/JoinObserver.java | 3 -
.../app/observers/PeriodicQueryObserver.java | 2 -
.../fluo/app/observers/ProjectionObserver.java | 2 -
.../fluo/app/observers/QueryResultObserver.java | 94 ++++----
.../app/observers/StatementPatternObserver.java | 4 -
.../indexing/pcj/fluo/app/query/FluoQuery.java | 20 +-
.../pcj/fluo/app/query/FluoQueryColumns.java | 26 ---
.../fluo/app/query/FluoQueryMetadataDAO.java | 19 +-
.../pcj/fluo/app/query/QueryMetadata.java | 5 +-
.../fluo/app/query/SparqlFluoQueryBuilder.java | 24 ++-
.../app/query/UnsupportedQueryException.java | 41 ++++
.../pcj/fluo/app/util/FluoQueryUtils.java | 8 +
.../export/rya/KafkaExportParametersTest.java | 25 +--
.../app/export/rya/RyaExportParametersTest.java | 6 +-
.../fluo/app/query/PeriodicQueryUtilTest.java | 2 +-
.../app/query/QueryMetadataVisitorTest.java | 2 +-
.../pcj/fluo/client/PcjAdminClient.java | 4 +
.../pcj/fluo/client/PcjAdminClientCommand.java | 4 +-
.../fluo/client/command/NewQueryCommand.java | 3 +-
.../fluo/client/command/QueryReportCommand.java | 3 +-
.../fluo/client/util/QueryReportRenderer.java | 10 +-
.../rya/indexing/pcj/fluo/demo/DemoDriver.java | 2 +-
.../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 7 +-
.../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 5 +-
.../indexing/pcj/fluo/api/ListQueryIdsIT.java | 10 +-
.../fluo/app/query/FluoQueryMetadataDAOIT.java | 24 +--
.../indexing/pcj/fluo/integration/BatchIT.java | 3 +-
.../pcj/fluo/integration/CreateDeleteIT.java | 7 +-
.../pcj/fluo/integration/KafkaExportIT.java | 20 +-
.../integration/KafkaRyaSubGraphExportIT.java | 22 +-
.../indexing/pcj/fluo/integration/QueryIT.java | 6 +-
.../pcj/fluo/test/base/KafkaExportITBase.java | 59 ++---
.../rya/pcj/fluo/test/base/RyaExportITBase.java | 3 +-
.../PeriodicNotificationProviderIT.java | 3 +-
.../pruner/PeriodicNotificationBinPrunerIT.java | 6 +-
.../notification/api/CreatePeriodicQuery.java | 19 +-
.../pruner/PeriodicQueryPruner.java | 9 +-
.../recovery/PeriodicNotificationProvider.java | 3 +-
.../org/apache/rya/shell/RyaAdminCommands.java | 21 +-
.../apache/rya/shell/RyaAdminCommandsTest.java | 12 +-
80 files changed, 1698 insertions(+), 924 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
index e03a1f1..6e92b28 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
@@ -18,6 +18,8 @@
*/
package org.apache.rya.api.client;
+import java.util.Set;
+
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -28,7 +30,41 @@ import edu.umd.cs.findbugs.annotations.NonNull;
public interface CreatePCJ {
/**
+ * Metadata enum used to indicate the type of query that is registered. If
+ * the topmost node is a Construct QueryNode, then the type is Construct. If the
+ * topmost node is a Projection QueryNode, then the type is Projection. If the
+ * query contains a PeriodicQuery Filter anywhere within the query, then it is of type
+ * Periodic.
+ *
+ */
+ public static enum QueryType{CONSTRUCT, PROJECTION, PERIODIC};
+
+ /**
+ * Specifies the how Results will be exported from the Rya Fluo
+ * Application.
+ *
+ */
+ public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
+
+
+ /**
+ * Designate a new PCJ that will be maintained by the target instance of Rya.
+ * Results will be exported according to the specified export strategies.
+ *
+ * @param instanceName - Indicates which Rya instance will create and maintain
+ * the PCJ. (not null)
+ * @param sparql - The SPARQL query that will be maintained. (not null)
+ * @param strategies - The export strategies used to export results for this query
+ * @return The ID that was assigned to this newly created PCJ.
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public String createPCJ(final String instanceName, String sparql, Set<ExportStrategy> strategies) throws InstanceDoesNotExistException, RyaClientException;
+
+
+ /**
* Designate a new PCJ that will be maintained by the target instance of Rya.
+ * Results will be exported to a Rya PCJ table.
*
* @param instanceName - Indicates which Rya instance will create and maintain
* the PCJ. (not null)
@@ -38,4 +74,5 @@ public interface CreatePCJ {
* @throws RyaClientException Something caused the command to fail.
*/
public String createPCJ(final String instanceName, String sparql) throws InstanceDoesNotExistException, RyaClientException;
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
index 644189a..6aef33c 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java
@@ -20,6 +20,8 @@ package org.apache.rya.api.client.accumulo;
import static java.util.Objects.requireNonNull;
+import java.util.Set;
+
import org.apache.accumulo.core.client.Connector;
import org.apache.fluo.api.client.FluoClient;
import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
@@ -39,6 +41,7 @@ import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator;
import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
@@ -49,6 +52,7 @@ import org.openrdf.repository.RepositoryException;
import org.openrdf.sail.SailException;
import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -73,7 +77,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
}
@Override
- public String createPCJ(final String instanceName, final String sparql) throws InstanceDoesNotExistException, RyaClientException {
+ public String createPCJ(final String instanceName, final String sparql, Set<ExportStrategy> strategies) throws InstanceDoesNotExistException, RyaClientException {
requireNonNull(instanceName);
requireNonNull(sparql);
@@ -99,9 +103,14 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
if(fluoDetailsHolder.isPresent()) {
final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
try {
- updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId);
+ updateFluoApp(instanceName, fluoAppName, pcjId, sparql, strategies);
} catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) {
throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+ } catch (UnsupportedQueryException e) {
+ throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node "
+ + "or an invalid ExportStrategy for the given QueryType. Projection queries can be exported to either Rya or Kafka,"
+ + "unless they contain an aggregation, in which case they can only be exported to Kafka. Construct queries can be exported"
+ + "to Rya and Kafka, and Periodic queries can only be exported to Rya.");
}
// Update the Rya Details to indicate the PCJ is being updated incrementally.
@@ -133,9 +142,16 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
}
}
- private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException {
- requireNonNull(pcjStorage);
+ @Override
+ public String createPCJ(String instanceName, String sparql) throws InstanceDoesNotExistException, RyaClientException {
+ return createPCJ(instanceName, sparql, Sets.newHashSet(ExportStrategy.RYA));
+ }
+
+
+ private void updateFluoApp(final String ryaInstance, final String fluoAppName, final String pcjId, String sparql, Set<ExportStrategy> strategies) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException {
+ requireNonNull(sparql);
requireNonNull(pcjId);
+ requireNonNull(strategies);
// Connect to the Fluo application that is updating this instance's PCJs.
final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
@@ -147,7 +163,8 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ {
fluoAppName);) {
// Initialize the PCJ within the Fluo application.
final CreateFluoPcj fluoCreatePcj = new CreateFluoPcj();
- fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance);
+ fluoCreatePcj.withRyaIntegration(pcjId, sparql, strategies, fluoClient, getConnector(), ryaInstance);
}
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
index eb2b2d7..547254d 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java
@@ -123,7 +123,11 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ {
cd.getZookeepers(),
fluoAppName)) {
// Delete the PCJ from the Fluo App.
- new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId);
+ try {
+ new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId);
+ } catch (Exception e) {
+ log.warn("PcjId corresponds to an invalid PCJ. The query cannot be deleted.");
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
index 113b397..695704b 100644
--- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
+++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java
@@ -199,7 +199,7 @@ public abstract class FluoITBase {
final HashMap<String, String> params = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(params);
- ryaParams.setExportToRya(true);
+ ryaParams.setUseRyaBindingSetExporter(true);
ryaParams.setAccumuloInstanceName(instanceName);
ryaParams.setZookeeperServers(zookeepers);
ryaParams.setExporterUsername(clusterInstance.getUsername());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexingExample/src/main/java/RyaClientExample.java
----------------------------------------------------------------------
diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java b/extras/indexingExample/src/main/java/RyaClientExample.java
index 1b0450f..b0afd5a 100644
--- a/extras/indexingExample/src/main/java/RyaClientExample.java
+++ b/extras/indexingExample/src/main/java/RyaClientExample.java
@@ -249,7 +249,7 @@ public class RyaClientExample {
// export observer.
final HashMap<String, String> params = new HashMap<>();
final RyaExportParameters ryaParams = new RyaExportParameters(params);
- ryaParams.setExportToRya(true);
+ ryaParams.setUseRyaBindingSetExporter(true);
ryaParams.setAccumuloInstanceName(instanceName);
ryaParams.setZookeeperServers(zookeepers);
ryaParams.setExporterUsername(username);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
index 26c4339..e297ec9 100644
--- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
+++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java
@@ -46,6 +46,7 @@ public class ShiftVarOrderFactory implements PcjVarOrderFactory {
final Set<String> bindingNames = new SPARQLParser().parseQuery(sparql, null)
.getTupleExpr()
.getBindingNames();
+
return makeVarOrders( new VariableOrder(bindingNames) );
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
index 150a256..501f1f5 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -27,7 +27,6 @@ import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.UUID;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -38,6 +37,7 @@ import org.apache.fluo.api.client.Transaction;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
@@ -50,6 +50,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
@@ -61,6 +63,7 @@ import org.openrdf.query.MalformedQueryException;
import org.openrdf.query.algebra.StatementPattern;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
import edu.umd.cs.findbugs.annotations.NonNull;
@@ -128,60 +131,92 @@ public class CreateFluoPcj {
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ. This method
* creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
- * inside of Fluo. This method assumes that the user will export the results to Kafka or
- * some other external resource. The export id is equivalent to the queryId that is returned,
- * which is in contrast to the other createPcj methods in this class which accept an external pcjId
- * that is used to identify the Accumulo table or Kafka topic for exporting results.
+ * inside of Fluo. This method assumes that the user will export the results to Kafka
+ * according to the Kafka {@link ExportStrategy}.
*
* @param sparql - sparql query String to be registered with Fluo
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @return The metadata that was written to the Fluo application for the PCJ.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+ * @throws UnsupportedQueryException
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
*/
- public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException {
+ public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
Preconditions.checkNotNull(sparql);
Preconditions.checkNotNull(fluo);
- String pcjId = UUID.randomUUID().toString().replaceAll("-", "");
- return createPcj(pcjId, sparql, fluo);
+ String pcjId = FluoQueryUtils.createNewPcjId();
+ return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluo);
}
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides
* no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely
- * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated
- * inside of Fluo. This method assumes that the user will export the results to Kafka or
- * some other external resource.
+ * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated
+ * inside of Fluo. Results are exported according to the Set of {@link ExportStrategy} enums. If
+ * the Rya ExportStrategy is specified, care should be taken to verify that the PCJ table exists.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param sparql - sparql query String to be registered with Fluo
+ * @param strategies - ExportStrategies used to specify how final results will be handled
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @return The metadata that was written to the Fluo application for the PCJ.
- * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ * @throws UnsupportedQueryException
+ * @throws MalformedQueryException
*/
public FluoQuery createPcj(
final String pcjId,
final String sparql,
- final FluoClient fluo) throws MalformedQueryException {
+ final Set<ExportStrategy> strategies,
+ final FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(sparql);
+ requireNonNull(strategies);
requireNonNull(fluo);
- FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId);
+ FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId, strategies);
writeFluoQuery(fluo, fluoQuery, pcjId);
return fluoQuery;
}
- private FluoQuery makeFluoQuery(String sparql, String pcjId) throws MalformedQueryException {
+ /**
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an
+ * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
+ * Results are exported to a PCJ table with the provided pcjId according to the Rya
+ * {@link ExportStrategy}.
+ *
+ * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
+ * @param pcjStorage - Provides access to the PCJ index. (not null)
+ * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
+ * @return The metadata that was written to the Fluo application for the PCJ.
+ * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
+ * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ * @throws UnsupportedQueryException
+ */
+ public FluoQuery createPcj(
+ final String pcjId,
+ final PrecomputedJoinStorage pcjStorage,
+ final FluoClient fluo) throws MalformedQueryException, PcjException, UnsupportedQueryException {
+ requireNonNull(pcjId);
+ requireNonNull(pcjStorage);
+ requireNonNull(fluo);
+
+ // Parse the query's structure for the metadata that will be written to fluo.
+ final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
+ final String sparql = pcjMetadata.getSparql();
+ return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo);
+ }
+
+ private FluoQuery makeFluoQuery(String sparql, String pcjId, Set<ExportStrategy> strategies) throws MalformedQueryException, UnsupportedQueryException {
String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
- SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
- builder.setFluoQueryId(queryId);
- builder.setSparql(sparql);
- builder.setJoinBatchSize(joinBatchSize);
+ SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder()
+ .setExportStrategies(strategies)
+ .setFluoQueryId(queryId)
+ .setSparql(sparql)
+ .setJoinBatchSize(joinBatchSize);
return builder.build();
}
@@ -195,56 +230,72 @@ public class CreateFluoPcj {
tx.commit();
}
}
-
/**
- * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an
- * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists.
+ * Tells the Fluo PCJ Updater application to maintain a new PCJ.
+ * <p>
+ * This call scans Rya for Statement Pattern matches and inserts them into
+ * the Fluo application. It is assumed that results for any query registered
+ * using this method will be exported to Kafka according to the Kafka {@link ExportStrategy}.
*
- * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
- * @param pcjStorage - Provides access to the PCJ index. (not null)
+ * @param sparql - sparql query that will registered with Fluo. (not null)
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @return The metadata that was written to the Fluo application for the PCJ.
+ * @param accumulo - Accumulo connector for connecting with Accumulo
+ * @param ryaInstance - Name of Rya instance to connect to
+ * @return The Fluo application's Query ID of the query that was created.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
+ * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+ * @throws UnsupportedQueryException
*/
- public FluoQuery createPcj(
- final String pcjId,
- final PrecomputedJoinStorage pcjStorage,
- final FluoClient fluo) throws MalformedQueryException, PcjException {
- requireNonNull(pcjId);
- requireNonNull(pcjStorage);
+ public String withRyaIntegration(
+ final String sparql,
+ final FluoClient fluo,
+ final Connector accumulo,
+ final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
+ requireNonNull(sparql);
requireNonNull(fluo);
+ requireNonNull(accumulo);
+ requireNonNull(ryaInstance);
- // Parse the query's structure for the metadata that will be written to fluo.
- final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
- final String sparql = pcjMetadata.getSparql();
- return createPcj(pcjId, sparql, fluo);
+
+ // Write the SPARQL query's structure to the Fluo Application.
+ final FluoQuery fluoQuery = createPcj(sparql, fluo);
+ //import results already ingested into Rya that match query
+ importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
+ // return queryId to the caller for later monitoring from the export.
+ return fluoQuery.getQueryMetadata().getNodeId();
}
+
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ.
* <p>
* This call scans Rya for Statement Pattern matches and inserts them into
* the Fluo application. This method does not verify that a PcjTable with the
- * the given pcjId actually exists. It is assumed that results for any query registered
- * using this method will be exported to Kafka or some other external service.
+ * the given pcjId actually exists, so one should verify that the table exists before
+ * using the Rya ExportStrategy. Results will be exported according to the Set of
+ * {@link ExportStrategy} enums.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param sparql - sparql query that will registered with Fluo. (not null)
+ * @param strategies - ExportStrategies used to specify how final results will be handled
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+ * @param accumulo - Accumulo connector for connecting with Accumulo
+ * @param ryaInstance - name of Rya instance to connect to
* @return The Fluo application's Query ID of the query that was created.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+ * @throws UnsupportedQueryException
*/
public String withRyaIntegration(
final String pcjId,
final String sparql,
+ final Set<ExportStrategy> strategies,
final FluoClient fluo,
final Connector accumulo,
- final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+ final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(sparql);
requireNonNull(fluo);
@@ -253,14 +304,13 @@ public class CreateFluoPcj {
// Write the SPARQL query's structure to the Fluo Application.
- final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo);
+ final FluoQuery fluoQuery = createPcj(pcjId, sparql, strategies, fluo);
//import results already ingested into Rya that match query
importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
// return queryId to the caller for later monitoring from the export.
return fluoQuery.getQueryMetadata().getNodeId();
}
-
/**
* Tells the Fluo PCJ Updater application to maintain a new PCJ.
* <p>
@@ -268,24 +318,26 @@ public class CreateFluoPcj {
* the Fluo application. The Fluo application will then maintain the intermediate
* results as new triples are inserted and export any new query results to the
* {@code pcjId} within the provided {@code pcjStorage}. This method requires that a
- * PCJ table already exist for the query corresponding to the pcjId. Results will be exported
- * to this table.
+ * PCJ table already exist for the query corresponding to the pcjId. By default, results will be exported
+ * to this table according to the Rya {@link ExportStrategy}.
*
* @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null)
* @param pcjStorage - Provides access to the PCJ index. (not null)
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
- * @param queryEngine - QueryEngine for a given Rya Instance, (not null)
+ * @param accumulo - Accumuo connector for connecting to Accumulo
+ * @param ryaInstance - name of Rya instance to connect to
* @return The Fluo application's Query ID of the query that was created.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
+ * @throws UnsupportedQueryException
*/
public String withRyaIntegration(
final String pcjId,
final PrecomputedJoinStorage pcjStorage,
final FluoClient fluo,
final Connector accumulo,
- final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException {
+ final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException {
requireNonNull(pcjId);
requireNonNull(pcjStorage);
requireNonNull(fluo);
@@ -296,9 +348,11 @@ public class CreateFluoPcj {
final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId);
final String sparql = pcjMetadata.getSparql();
- return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance);
+ return withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo, accumulo, ryaInstance);
}
+
+
private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance)
throws RyaDAOException {
// Reuse the same set object while performing batch inserts.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
index 58a52fb..0d97b2f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java
@@ -21,7 +21,6 @@ package org.apache.rya.indexing.pcj.fluo.api;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
-import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -33,15 +32,10 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
-import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
-import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
-import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.openrdf.query.BindingSet;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -85,8 +79,9 @@ public class DeleteFluoPcj {
* Index. (not null)
* @param pcjId - The PCJ ID for the query that will removed from the Fluo
* application. (not null)
+ * @throws UnsupportedQueryException
*/
- public void deletePcj(final FluoClient client, final String pcjId) {
+ public void deletePcj(final FluoClient client, final String pcjId) throws UnsupportedQueryException {
requireNonNull(client);
requireNonNull(pcjId);
@@ -109,84 +104,17 @@ public class DeleteFluoPcj {
* @param tx - Transaction of a given Fluo table. (not null)
* @param pcjId - Id of query. (not null)
* @return list of Node IDs associated with the query {@code pcjId}.
+ * @throws UnsupportedQueryException
*/
- private List<String> getNodeIds(Transaction tx, String pcjId) {
+ private List<String> getNodeIds(Transaction tx, String pcjId) throws UnsupportedQueryException {
requireNonNull(tx);
requireNonNull(pcjId);
- // Get the ID that tracks the query within the Fluo application.
- final String queryId = getQueryIdFromPcjId(tx, pcjId);
-
- // Get the query's children nodes.
- final List<String> nodeIds = new ArrayList<>();
- nodeIds.add(queryId);
- getChildNodeIds(tx, queryId, nodeIds);
- return nodeIds;
+ String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
+ FluoQuery fluoQuery = dao.readFluoQuery(tx, queryId);
+ return FluoQueryUtils.collectNodeIds(fluoQuery);
}
- /**
- * Recursively navigate query tree to extract all of the nodeIds.
- *
- * @param tx - Transaction of a given Fluo table. (not null)
- * @param nodeId - Current node in query tree. (not null)
- * @param nodeIds - The Node IDs extracted from query tree. (not null)
- */
- private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) {
- requireNonNull(tx);
- requireNonNull(nodeId);
- requireNonNull(nodeIds);
-
- final NodeType type = NodeType.fromNodeId(nodeId).get();
- switch (type) {
- case QUERY:
- final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId);
- final String queryChild = queryMeta.getChildNodeId();
- nodeIds.add(queryChild);
- getChildNodeIds(tx, queryChild, nodeIds);
- break;
- case CONSTRUCT:
- final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId);
- final String constructChild = constructMeta.getChildNodeId();
- nodeIds.add(constructChild);
- getChildNodeIds(tx, constructChild, nodeIds);
- break;
- case JOIN:
- final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId);
- final String lchild = joinMeta.getLeftChildNodeId();
- final String rchild = joinMeta.getRightChildNodeId();
- nodeIds.add(lchild);
- nodeIds.add(rchild);
- getChildNodeIds(tx, lchild, nodeIds);
- getChildNodeIds(tx, rchild, nodeIds);
- break;
- case FILTER:
- final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId);
- final String filterChild = filterMeta.getChildNodeId();
- nodeIds.add(filterChild);
- getChildNodeIds(tx, filterChild, nodeIds);
- break;
- case AGGREGATION:
- final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId);
- final String aggChild = aggMeta.getChildNodeId();
- nodeIds.add(aggChild);
- getChildNodeIds(tx, aggChild, nodeIds);
- break;
- case PERIODIC_QUERY:
- final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId);
- final String periodicChild = periodicMeta.getChildNodeId();
- nodeIds.add(periodicChild);
- getChildNodeIds(tx, periodicChild, nodeIds);
- break;
- case PROJECTION:
- final ProjectionMetadata projectionMetadata = dao.readProjectionMetadata(tx, nodeId);
- final String projectionChild = projectionMetadata.getChildNodeId();
- nodeIds.add(projectionChild);
- getChildNodeIds(tx, projectionChild, nodeIds);
- break;
- case STATEMENT_PATTERN:
- break;
- }
- }
/**
* Deletes metadata for all nodeIds associated with a given queryId in a
@@ -203,8 +131,6 @@ public class DeleteFluoPcj {
requireNonNull(pcjId);
try (final Transaction typeTx = tx) {
- deletePcjIdAndSparqlMetadata(typeTx, pcjId);
-
for (final String nodeId : nodeIds) {
final NodeType type = NodeType.fromNodeId(nodeId).get();
deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns());
@@ -232,24 +158,6 @@ public class DeleteFluoPcj {
}
/**
- * Deletes high level query meta for converting from queryId to pcjId and
- * vice versa, as well as converting from sparql to queryId.
- *
- * @param tx - Transaction the deletes will be performed with. (not null)
- * @param pcjId - The PCJ whose metadata will be deleted. (not null)
- */
- private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) {
- requireNonNull(tx);
- requireNonNull(pcjId);
-
- final String queryId = getQueryIdFromPcjId(tx, pcjId);
- final String sparql = getSparqlFromQueryId(tx, queryId);
- tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID);
- tx.delete(sparql, FluoQueryColumns.QUERY_ID);
- tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID);
- }
-
- /**
* Deletes all results (BindingSets or Statements) associated with the specified nodeId.
*
* @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null)
@@ -294,19 +202,4 @@ public class DeleteFluoPcj {
}
}
- private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) {
- requireNonNull(tx);
- requireNonNull(pcjId);
-
- final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID);
- return queryIdBytes.toString();
- }
-
- private String getSparqlFromQueryId(final Transaction tx, final String queryId) {
- requireNonNull(tx);
- requireNonNull(queryId);
-
- final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
- return metadata.getSparql();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
index 061a1d5..d08cb73 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java
@@ -24,15 +24,13 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.storage.PcjException;
import org.apache.rya.indexing.pcj.storage.PcjMetadata;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.data.Bytes;
-
/**
* Get {@link PcjMetadata} for queries that are managed by the Fluo app.
*/
@@ -87,7 +85,7 @@ public class GetPcjMetadata {
// Lookup the Rya PCJ ID associated with the query.
String pcjId = null;
try(Snapshot snap = fluo.newSnapshot() ) {
- pcjId = snap.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+ pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
if(pcjId == null) {
throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId +
"' because a Rya PCJ ID not stored in the Fluo table.");
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
index 1fb1485..ddbaaaf 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java
@@ -25,27 +25,27 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import edu.umd.cs.findbugs.annotations.Nullable;
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-import net.jcip.annotations.Immutable;
-
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
import com.google.common.collect.ImmutableMap;
-import org.apache.fluo.api.client.FluoClient;
-import org.apache.fluo.api.client.Snapshot;
-import org.apache.fluo.api.client.SnapshotBase;
-import org.apache.fluo.api.client.scanner.ColumnScanner;
-import org.apache.fluo.api.client.scanner.RowScanner;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.data.Span;
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+import net.jcip.annotations.Immutable;
/**
* Get a reports that indicates how many binding sets have been emitted for
@@ -63,8 +63,9 @@ public class GetQueryReport {
* @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null)
* @return A map from Query ID to QueryReport that holds a report for all of
* the queries that are being managed within the fluo app.
+ * @throws UnsupportedQueryException
*/
- public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) {
+ public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) throws UnsupportedQueryException {
checkNotNull(fluo);
// Fetch the queries that are being managed by the Fluo.
@@ -85,8 +86,9 @@ public class GetQueryReport {
* @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null)
* @param queryId - The ID of the query to fetch. (not null)
* @return A report that was built for the query.
+ * @throws UnsupportedQueryException
*/
- public QueryReport getReport(final FluoClient fluo, final String queryId) {
+ public QueryReport getReport(final FluoClient fluo, final String queryId) throws UnsupportedQueryException {
checkNotNull(fluo);
checkNotNull(queryId);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
index df1648b..e09d0c6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java
@@ -52,7 +52,7 @@ public class ListQueryIds {
try(Snapshot snap = fluo.newSnapshot() ) {
// Create an iterator that iterates over the QUERY_ID column.
- final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_ID).build();
+ final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_NODE_ID).build();
for (RowColumnValue rcv : cellScanner) {
queryIds.add(rcv.getsValue());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index 4b6f44e..c090d37 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -39,9 +39,6 @@ public class IncrementalUpdateConstants {
public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
- public static enum QueryType{Construct, Projection, Periodic};
- public static enum ExportStrategy{Rya, Kafka};
-
public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
new file mode 100644
index 0000000..62f1271
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.data.Bytes;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class manages all of the {@link IncrementalResultExporter}s for the Rya Fluo Application.
+ * It maps the {@link FluoQuery}'s {@link QueryType} and Set of {@link ExportStrategy} objects
+ * to the correct IncrementalResultExporter.
+ *
+ */
+public class ExporterManager implements AutoCloseable {
+
+ private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
+ private static final RyaSubGraphKafkaSerDe SG_SERDE = new RyaSubGraphKafkaSerDe();
+ private Map<String, String> simplifiedVisibilities = new HashMap<>();
+
+ private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters;
+
+ private ExporterManager(Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters) {
+ this.exporters = Preconditions.checkNotNull(exporters);
+ }
+
+ /**
+ * @return {@link Builder} for constructing an instance of an ExporterManager.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Maps the data to the correct {@link IncrementalResultExporter} using the provided
+ * QueryType and ExportStrategies to be exported.
+ * @param type - QueryType that produced the result
+ * @param strategies - ExportStrategies used to export the result
+ * @param queryId - Fluo Query Id for the query that produced the result
+ * @param data - Serialized result to be exported
+ * @throws ResultExportException
+ */
+ public void export(QueryType type, Set<ExportStrategy> strategies, String queryId, Bytes data) throws ResultExportException {
+
+ String pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId);
+
+ if(type == QueryType.CONSTRUCT) {
+ exportSubGraph(exporters.get(type), strategies, pcjId, data);
+ } else {
+ exportBindingSet(exporters.get(type), strategies, pcjId, data);
+ }
+
+ }
+
+ /**
+ * Exports BindingSet using the exporters for a given {@link QueryType}.
+ * @param exporters - exporters corresponding to a given queryType
+ * @param strategies - export strategies used to export results (possibly a subset of those in the exporters map)
+ * @param pcjId - id of the query whose results are being exported
+ * @param data - serialized BindingSet result
+ * @throws ResultExportException
+ */
+ private void exportBindingSet(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException {
+ try {
+ VisibilityBindingSet bs = BS_SERDE.deserialize(data);
+ simplifyVisibilities(bs);
+
+ for(ExportStrategy strategy: strategies) {
+ IncrementalBindingSetExporter exporter = (IncrementalBindingSetExporter) exporters.get(strategy);
+ exporter.export(pcjId, bs);
+ }
+ } catch (Exception e) {
+ throw new ResultExportException("Unable to deserialize the provided BindingSet", e);
+ }
+ }
+
+ /**
+ * Exports RyaSubGraph using the exporters for a given {@link QueryType}.
+ * @param exporters - exporters corresponding to a given queryType
+ * @param strategies - export strategies used to export results (possibly a subset of those in the exporters map)
+ * @param pcjId - id of the query whose results are being exported
+ * @param data - serialized RyaSubGraph result
+ * @throws ResultExportException
+ */
+ private void exportSubGraph(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException {
+ RyaSubGraph subGraph = SG_SERDE.fromBytes(data.toArray());
+
+ try {
+ simplifyVisibilities(subGraph);
+ } catch (UnsupportedEncodingException e) {
+ throw new ResultExportException("Undable to deserialize provided RyaSubgraph", e);
+ }
+
+ for(ExportStrategy strategy: strategies) {
+ IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy);
+ exporter.export(pcjId, subGraph);
+ }
+ }
+
+ private void simplifyVisibilities(VisibilityBindingSet result) {
+ // Simplify the result's visibilities.
+ final String visibility = result.getVisibility();
+ if(!simplifiedVisibilities.containsKey(visibility)) {
+ final String simplified = VisibilitySimplifier.simplify( visibility );
+ simplifiedVisibilities.put(visibility, simplified);
+ }
+ result.setVisibility( simplifiedVisibilities.get(visibility) );
+ }
+
+ private void simplifyVisibilities(RyaSubGraph subgraph) throws UnsupportedEncodingException {
+ Set<RyaStatement> statements = subgraph.getStatements();
+ if (statements.size() > 0) {
+ byte[] visibilityBytes = statements.iterator().next().getColumnVisibility();
+ // Simplify the result's visibilities and cache new simplified
+ // visibilities
+ String visibility = new String(visibilityBytes, "UTF-8");
+ if (!simplifiedVisibilities.containsKey(visibility)) {
+ String simplified = VisibilitySimplifier.simplify(visibility);
+ simplifiedVisibilities.put(visibility, simplified);
+ }
+
+ for (RyaStatement statement : statements) {
+ statement.setColumnVisibility(simplifiedVisibilities.get(visibility).getBytes("UTF-8"));
+ }
+
+ subgraph.setStatements(statements);
+ }
+ }
+
+ public static class Builder {
+
+ private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters = new HashMap<>();
+
+ /**
+ * Add an {@link IncrementalResultExporter} to be used by this ExporterManager for exporting results
+ * @param exporter - IncrementalResultExporter for exporting query results
+ * @return - Builder for chaining method calls
+ */
+ public Builder addIncrementalResultExporter(IncrementalResultExporter exporter) {
+
+ Set<QueryType> types = exporter.getQueryTypes();
+ ExportStrategy strategy = exporter.getExportStrategy();
+
+ for (QueryType type : types) {
+ if (!exporters.containsKey(type)) {
+ Map<ExportStrategy, IncrementalResultExporter> exportMap = new HashMap<>();
+ exportMap.put(strategy, exporter);
+ exporters.put(type, exportMap);
+ } else {
+ Map<ExportStrategy, IncrementalResultExporter> exportMap = exporters.get(type);
+ if (!exportMap.containsKey(strategy)) {
+ exportMap.put(strategy, exporter);
+ }
+ }
+ }
+
+ return this;
+ }
+
+ /**
+ * @return - ExporterManager for managing IncrementalResultExporters and exporting results
+ */
+ public ExporterManager build() {
+ //adds NoOpExporter in the event that users does not want to Export results
+ addIncrementalResultExporter(new NoOpExporter());
+ return new ExporterManager(exporters);
+ }
+
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ Collection<Map<ExportStrategy, IncrementalResultExporter>> values = exporters.values();
+
+ for(Map<ExportStrategy, IncrementalResultExporter> map: values) {
+ for(IncrementalResultExporter exporter: map.values()) {
+ exporter.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
index c2f4cb4..9877671 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java
@@ -18,7 +18,6 @@
*/
package org.apache.rya.indexing.pcj.fluo.app.export;
-import org.apache.fluo.api.client.TransactionBase;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -29,17 +28,16 @@ import edu.umd.cs.findbugs.annotations.NonNull;
* other location.
*/
@DefaultAnnotation(NonNull.class)
-public interface IncrementalBindingSetExporter extends AutoCloseable {
+public interface IncrementalBindingSetExporter extends IncrementalResultExporter {
/**
* Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause.
*
- * @param tx - The Fluo transaction this export is a part of. (not null)
- * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null)
+ * @param queryId - The PCJ ID of the SPARQL query the binding set is a result of. (not null)
* @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null)
* @throws ResultExportException The result could not be exported.
*/
- public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException;
+ public void export(String queryId, VisibilityBindingSet result) throws ResultExportException;
/**
* A result could not be exported.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
deleted file mode 100644
index 1bf492a..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Builds instances of {@link IncrementalBindingSetExporter} using the provided
- * configurations.
- */
-@DefaultAnnotation(NonNull.class)
-public interface IncrementalBindingSetExporterFactory {
-
- /**
- * Builds an instance of {@link IncrementalBindingSetExporter} using the
- * configurations that are provided.
- *
- * @param context - Contains the host application's configuration values
- * and any parameters that were provided at initialization. (not null)
- * @return An exporter if configurations were found in the context; otherwise absent.
- * @throws IncrementalExporterFactoryException A non-configuration related
- * problem has occurred and the exporter could not be created as a result.
- * @throws ConfigurationException Thrown if configuration values were
- * provided, but an instance of the exporter could not be initialized
- * using them. This could be because they were improperly formatted,
- * a required field was missing, or some other configuration based problem.
- */
- public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
-
- /**
- * Indicates a {@link IncrementalBindingSetExporter} could not be created by a
- * {@link IncrementalBindingSetExporterFactory}.
- */
- public static class IncrementalExporterFactoryException extends Exception {
- private static final long serialVersionUID = 1L;
-
- /**
- * Constructs an instance of {@link }.
- *
- * @param message - Explains why this exception is being thrown.
- */
- public IncrementalExporterFactoryException(final String message) {
- super(message);
- }
-
- /**
- * Constructs an instance of {@link }.
- *
- * @param message - Explains why this exception is being thrown.
- * @param cause - The exception that caused this one to be thrown.
- */
- public IncrementalExporterFactoryException(final String message, final Throwable t) {
- super(message, t);
- }
- }
-
- /**
- * The configuration could not be interpreted because required fields were
- * missing or a value wasn't properly formatted.
- */
- public static class ConfigurationException extends IncrementalExporterFactoryException {
- private static final long serialVersionUID = 1L;
-
- /**
- * Constructs an instance of {@link ConfigurationException}.
- *
- * @param message - Explains why this exception is being thrown.
- */
- public ConfigurationException(final String message) {
- super(message);
- }
-
- /**
- * Constructs an instance of {@link ConfigurationException}.
- *
- * @param message - Explains why this exception is being thrown.
- * @param cause - The exception that caused this one to be thrown.
- */
- public ConfigurationException(final String message, final Throwable cause) {
- super(message, cause);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
new file mode 100644
index 0000000..e49a777
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import java.util.Set;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+
+/**
+ * Common interface for the different incremental exporters used in the Rya Fluo Application.
+ *
+ */
+public interface IncrementalResultExporter extends AutoCloseable {
+
+ /**
+ * @return - A Set of {@link QueryType}s whose results this exporter handles
+ */
+ public Set<QueryType> getQueryTypes();
+
+ /**
+ * @return - The {@link ExportStrategy} indicating where results are exported
+ */
+ public ExportStrategy getExportStrategy();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
new file mode 100644
index 0000000..5bba4ab
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Builds instances of {@link IncrementalResultExporter} using the provided
+ * configurations.
+ */
+@DefaultAnnotation(NonNull.class)
+public interface IncrementalResultExporterFactory {
+
+ /**
+ * Builds an instance of {@link IncrementalResultExporter} using the
+ * configurations that are provided.
+ *
+ * @param context - Contains the host application's configuration values
+ * and any parameters that were provided at initialization. (not null)
+ * @return An exporter if configurations were found in the context; otherwise absent.
+ * @throws IncrementalExporterFactoryException A non-configuration related
+ * problem has occurred and the exporter could not be created as a result.
+ * @throws ConfigurationException Thrown if configuration values were
+ * provided, but an instance of the exporter could not be initialized
+ * using them. This could be because they were improperly formatted,
+ * a required field was missing, or some other configuration based problem.
+ */
+ public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+
+ /**
+ * Indicates a {@link IncrementalResultExporter} could not be created by a
+ * {@link IncrementalBindingSetExporterFactory}.
+ */
+ public static class IncrementalExporterFactoryException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link }.
+ *
+ * @param message - Explains why this exception is being thrown.
+ */
+ public IncrementalExporterFactoryException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an instance of {@link }.
+ *
+ * @param message - Explains why this exception is being thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public IncrementalExporterFactoryException(final String message, final Throwable t) {
+ super(message, t);
+ }
+ }
+
+ /**
+ * The configuration could not be interpreted because required fields were
+ * missing or a value wasn't properly formatted.
+ */
+ public static class ConfigurationException extends IncrementalExporterFactoryException {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs an instance of {@link ConfigurationException}.
+ *
+ * @param message - Explains why this exception is being thrown.
+ */
+ public ConfigurationException(final String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an instance of {@link ConfigurationException}.
+ *
+ * @param message - Explains why this exception is being thrown.
+ * @param cause - The exception that caused this one to be thrown.
+ */
+ public ConfigurationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
index 797502c..7b7f084 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
@@ -25,7 +25,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter
* from the Rya-Fluo application to the core Rya tables.
*
*/
-public interface IncrementalRyaSubGraphExporter extends AutoCloseable {
+public interface IncrementalRyaSubGraphExporter extends IncrementalResultExporter {
/**
* Export a RyaSubGraph that is the result of SPARQL Construct Query.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
deleted file mode 100644
index ecbec09..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.rya.indexing.pcj.fluo.app.export;
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
-
-import com.google.common.base.Optional;
-
-/**
- * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided
- * configurations.
- */
-public interface IncrementalRyaSubGraphExporterFactory {
-
- /**
- * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the
- * configurations that are provided.
- *
- * @param context - Contains the host application's configuration values
- * and any parameters that were provided at initialization. (not null)
- * @return An exporter if configurations were found in the context; otherwise absent.
- * @throws IncrementalExporterFactoryException A non-configuration related
- * problem has occurred and the exporter could not be created as a result.
- * @throws ConfigurationException Thrown if configuration values were
- * provided, but an instance of the exporter could not be initialized
- * using them. This could be because they were improperly formatted,
- * a required field was missing, or some other configuration based problem.
- */
- public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
new file mode 100644
index 0000000..ab7f2ed
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export;
+
+import java.util.Set;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.collect.Sets;
+
+/**
+ * This class is a NoOpExporter that can be specified if a user does not
+ * want their results exported from Fluo.
+ *
+ */
+public class NoOpExporter implements IncrementalBindingSetExporter, IncrementalRyaSubGraphExporter {
+
+ @Override
+ public Set<QueryType> getQueryTypes() {
+ return Sets.newHashSet(QueryType.CONSTRUCT, QueryType.PROJECTION);
+ }
+
+ @Override
+ public ExportStrategy getExportStrategy() {
+ return ExportStrategy.NO_OP_EXPORT;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
+ }
+
+ @Override
+ public void export(String queryId, VisibilityBindingSet result) throws ResultExportException {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
index 7c4b3cc..0c26d65 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -20,18 +20,21 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.fluo.api.client.TransactionBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import com.google.common.collect.Sets;
+
/**
* Incrementally exports SPARQL query results to Kafka topics.
*/
@@ -57,17 +60,15 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
* Send the results to the topic using the queryID as the topicname
*/
@Override
- public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
- checkNotNull(fluoTx);
+ public void export(final String queryId, final VisibilityBindingSet result) throws ResultExportException {
checkNotNull(queryId);
checkNotNull(result);
try {
- final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
- final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
+ final String msg = "Out to Kafka topic: " + queryId + ", Result: " + result;
log.trace(msg);
// Send the result to the topic whose name matches the PCJ ID.
- final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
+ final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(queryId, result);
final Future<RecordMetadata> future = producer.send(rec);
// Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
@@ -84,4 +85,14 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
public void close() throws Exception {
producer.close(5, TimeUnit.SECONDS);
}
+
+ @Override
+ public Set<QueryType> getQueryTypes() {
+ return Sets.newHashSet(QueryType.PROJECTION);
+ }
+
+ @Override
+ public ExportStrategy getExportStrategy() {
+ return ExportStrategy.KAFKA;
+ }
}
\ No newline at end of file