You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/09/04 13:14:08 UTC
[4/4] incubator-rya git commit: RYA-319-Integration of Periodic Query
with CLI. Closes #220.
RYA-319-Integration of Periodic Query with CLI. Closes #220.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63f87b86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63f87b86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63f87b86
Branch: refs/heads/master
Commit: 63f87b868f33c718f085f5a7907d22b823dcd5d3
Parents: ad6ab01
Author: Caleb Meier <ca...@parsons.com>
Authored: Mon Aug 7 21:22:00 2017 -0700
Committer: Caleb Meier <ca...@parsons.com>
Committed: Mon Sep 4 06:11:12 2017 -0700
----------------------------------------------------------------------
.../org/apache/rya/api/client/CreatePCJ.java | 4 +-
.../rya/api/client/CreatePeriodicPCJ.java | 40 ++++
.../rya/api/client/DeletePeriodicPCJ.java | 38 ++++
.../rya/api/client/ListIncrementalQueries.java | 38 ++++
.../org/apache/rya/api/client/RyaClient.java | 31 +++
extras/indexing/pom.xml | 5 +-
.../accumulo/AccumuloCreatePeriodicPCJ.java | 145 ++++++++++++
.../accumulo/AccumuloDeletePeriodicPCJ.java | 135 +++++++++++
.../AccumuloListIncrementalQueries.java | 101 +++++++++
.../accumulo/AccumuloRyaClientFactory.java | 3 +
extras/rya.pcj.fluo/pcj.fluo.api/pom.xml | 4 +
.../indexing/pcj/fluo/api/CreateFluoPcj.java | 24 +-
.../pcj/fluo/api/CreatePeriodicQuery.java | 215 ++++++++++++++++++
.../pcj/fluo/api/DeletePeriodicQuery.java | 92 ++++++++
.../indexing/pcj/fluo/api/ListFluoQueries.java | 149 ++++++++++++
.../fluo/app/IncrementalUpdateConstants.java | 1 +
.../pcj/fluo/app/export/ExporterManager.java | 23 +-
.../pcj/fluo/app/export/NoOpExporter.java | 59 -----
.../KafkaBindingSetExporterParameters.java | 1 -
.../export/rya/PeriodicBindingSetExporter.java | 2 +-
.../fluo/app/observers/QueryResultObserver.java | 4 +-
.../indexing/pcj/fluo/app/query/FluoQuery.java | 6 +-
.../pcj/fluo/app/query/FluoQueryColumns.java | 1 +
.../fluo/app/query/FluoQueryMetadataDAO.java | 6 +-
.../pcj/fluo/api/ListFluoQueriesIT.java | 96 ++++++++
.../indexing/pcj/fluo/integration/BatchIT.java | 12 +-
.../pcj/fluo/integration/CreateDeleteIT.java | 3 +-
.../integration/CreateDeletePeriodicPCJ.java | 227 +++++++++++++++++++
.../pcj/fluo/integration/KafkaExportIT.java | 24 +-
.../indexing/pcj/fluo/integration/QueryIT.java | 40 ++--
.../pcj/fluo/test/base/KafkaExportITBase.java | 17 +-
.../periodic.service.api/.gitignore | 1 +
.../periodic.service.api/pom.xml | 52 +++++
.../periodic/notification/api/BinPruner.java | 40 ++++
.../notification/api/BindingSetExporter.java | 37 +++
.../notification/api/BindingSetRecord.java | 80 +++++++
.../api/BindingSetRecordExportException.java | 45 ++++
.../periodic/notification/api/LifeCycle.java | 45 ++++
.../rya/periodic/notification/api/NodeBin.java | 77 +++++++
.../periodic/notification/api/Notification.java | 34 +++
.../api/NotificationCoordinatorExecutor.java | 41 ++++
.../notification/api/NotificationProcessor.java | 41 ++++
.../api/PeriodicNotificationClient.java | 64 ++++++
.../notification/BasicNotification.java | 76 +++++++
.../notification/CommandNotification.java | 99 ++++++++
.../notification/PeriodicNotification.java | 178 +++++++++++++++
.../notification/TimestampedNotification.java | 69 ++++++
.../KafkaNotificationRegistrationClient.java | 80 +++++++
.../BasicNotificationTypeAdapter.java | 55 +++++
.../serialization/BindingSetSerDe.java | 105 +++++++++
.../CommandNotificationSerializer.java | 76 +++++++
.../CommandNotificationTypeAdapter.java | 89 ++++++++
.../PeriodicNotificationTypeAdapter.java | 73 ++++++
.../periodic.service.integration.tests/pom.xml | 29 +--
.../PeriodicNotificationApplicationIT.java | 102 ++++-----
.../PeriodicNotificationProviderIT.java | 5 +-
.../PeriodicNotificationExporterIT.java | 1 +
.../PeriodicNotificationProcessorIT.java | 2 +-
.../pruner/PeriodicNotificationBinPrunerIT.java | 7 +-
.../PeriodicCommandNotificationConsumerIT.java | 31 ++-
.../periodic.service.notification/pom.xml | 201 ++++++++--------
.../periodic/notification/api/BinPruner.java | 40 ----
.../notification/api/BindingSetExporter.java | 38 ----
.../notification/api/CreatePeriodicQuery.java | 124 ----------
.../periodic/notification/api/LifeCycle.java | 45 ----
.../rya/periodic/notification/api/NodeBin.java | 77 -------
.../periodic/notification/api/Notification.java | 34 ---
.../api/NotificationCoordinatorExecutor.java | 41 ----
.../notification/api/NotificationProcessor.java | 41 ----
.../api/PeriodicNotificationClient.java | 64 ------
.../PeriodicNotificationApplication.java | 2 +-
.../PeriodicNotificationApplicationFactory.java | 2 +-
.../notification/exporter/BindingSetRecord.java | 80 -------
.../exporter/KafkaExporterExecutor.java | 1 +
.../KafkaPeriodicBindingSetExporter.java | 9 +-
.../notification/BasicNotification.java | 76 -------
.../notification/CommandNotification.java | 99 --------
.../notification/PeriodicNotification.java | 178 ---------------
.../notification/TimestampedNotification.java | 69 ------
.../NotificationProcessorExecutor.java | 2 +-
.../TimestampedNotificationProcessor.java | 2 +-
.../KafkaNotificationRegistrationClient.java | 80 -------
.../BasicNotificationTypeAdapter.java | 55 -----
.../serialization/BindingSetSerDe.java | 105 ---------
.../CommandNotificationSerializer.java | 76 -------
.../CommandNotificationTypeAdapter.java | 89 --------
.../PeriodicNotificationTypeAdapter.java | 73 ------
extras/rya.periodic.service/pom.xml | 1 +
.../org/apache/rya/shell/RyaAdminCommands.java | 81 ++++++-
.../apache/rya/shell/RyaAdminCommandsTest.java | 65 ++++++
pom.xml | 20 ++
91 files changed, 3235 insertions(+), 1815 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
index 6e92b28..3c369d8 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java
@@ -28,7 +28,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
*/
@DefaultAnnotation(NonNull.class)
public interface CreatePCJ {
-
+
/**
* Metadata enum used to indicate the type of query that is registered. If
* the topmost node is a Construct QueryNode, then the type is Construct. If the
@@ -44,7 +44,7 @@ public interface CreatePCJ {
* Application.
*
*/
- public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
+ public static enum ExportStrategy{RYA, KAFKA, PERIODIC};
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java
new file mode 100644
index 0000000..7c006d0
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+/**
+ * This class creates new PeriodicPCJ for a given Rya instance.
+ */
+public interface CreatePeriodicPCJ {
+
+ /**
+ * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
+ * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
+ * bootStrapServers are the IP for the KafkaBrokers.
+ *
+ * @param instanceName - Rya instance to connect to
+ * @param sparql - SPARQL query registered with the Periodic Service
+ * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration with the
+ * PeriodicService
+ * @param bootStrapServers - Connection string for Kafka brokers
+ * @return Fluo Query Id of the registered Periodic Query
+ */
+ public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java
new file mode 100644
index 0000000..c30afd2
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+/**
+ * Deletes and instance of a Periodic PCJ from Rya
+ */
+public interface DeletePeriodicPCJ {
+
+ /**
+ * Deletes a PCJ from an instance of Rya.
+ *
+ * @param instanceName - Indicates which Rya instance is maintaining the Periodic PCJ. (not null)
+ * @param pcjId - The ID of the Periodic PCJ that will be deleted. (not null)
+ * @param topic - Kafka topic for deleteing PeriodicNotifications
+ * @param brokers - Comma delimited host/port pairs for connecting to Kafka brokers.
+ * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name.
+ * @throws RyaClientException Something caused the command to fail.
+ */
+ public void deletePeriodicPCJ(String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java b/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java
new file mode 100644
index 0000000..75e1297
--- /dev/null
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client;
+
+/**
+ * Verifies that Rya instance has Fluo application enabled and lists
+ * all SPARQL queries maintained by the applcation.
+ */
+public interface ListIncrementalQueries {
+
+ /**
+ * Lists all SPARQL queries maintained by the Fluo Application for a given rya instance and associated information,
+ * including the Fluo Query Id, the QueryType, the ExportStrategy, and the pretty-printed SPARQL query.
+ *
+ * @param ryaInstance - Rya instance whose queries are incrementally maintained by Fluo
+ * @return String comprised of new line delimited Strings that provide information about each query registered in
+ * Fluo, including the query Id, the query type, the export strategies, and the SPARQL query
+ * @throws RyaClientException
+ */
+ public String listIncrementalQueries(String ryaInstance) throws RyaClientException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
----------------------------------------------------------------------
diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
index d1481dc..c04bd86 100644
--- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
+++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java
@@ -34,6 +34,9 @@ public class RyaClient {
private final Install install;
private final CreatePCJ createPcj;
private final DeletePCJ deletePcj;
+ private final CreatePeriodicPCJ createPeriodicPcj;
+ private final DeletePeriodicPCJ deletePeriodicPcj;
+ private final ListIncrementalQueries listIncrementalQueries;
private final BatchUpdatePCJ bactchUpdatePCJ;
private final GetInstanceDetails getInstanceDetails;
private final InstanceExists instanceExists;
@@ -51,6 +54,9 @@ public class RyaClient {
final Install install,
final CreatePCJ createPcj,
final DeletePCJ deletePcj,
+ final CreatePeriodicPCJ createPeriodicPcj,
+ final DeletePeriodicPCJ deletePeriodicPcj,
+ final ListIncrementalQueries listIncrementalQueries,
final BatchUpdatePCJ batchUpdatePcj,
final GetInstanceDetails getInstanceDetails,
final InstanceExists instanceExists,
@@ -63,6 +69,9 @@ public class RyaClient {
this.install = requireNonNull(install);
this.createPcj = requireNonNull(createPcj);
this.deletePcj = requireNonNull(deletePcj);
+ this.createPeriodicPcj = createPeriodicPcj;
+ this.deletePeriodicPcj = deletePeriodicPcj;
+ this.listIncrementalQueries = listIncrementalQueries;
this.bactchUpdatePCJ = requireNonNull(batchUpdatePcj);
this.getInstanceDetails = requireNonNull(getInstanceDetails);
this.instanceExists = requireNonNull(instanceExists);
@@ -96,8 +105,30 @@ public class RyaClient {
public DeletePCJ getDeletePCJ() {
return deletePcj;
}
+
+ /**
+ * @return An instance of {@link CreatePeridodicPCJ} that is connected to a Rya Periodic Storage
+ */
+ public CreatePeriodicPCJ getCreatePeriodicPCJ() {
+ return createPeriodicPcj;
+ }
/**
+ * @return An instance of {@link DeletePeriodicPCJ} that is connected to a Rya Periodic Storage
+ */
+ public DeletePeriodicPCJ getDeletePeriodicPCJ() {
+ return deletePeriodicPcj;
+ }
+
+ /**
+ * @return An instance of {@link ListIncrementalQueries} for displaying queries that are incrementallly
+ * maintained by the Rya instance
+ */
+ public ListIncrementalQueries getListIncrementalQueries() {
+ return listIncrementalQueries;
+ }
+
+ /**
* @return An instance of {@link BatchUpdatePCJ} that is connect to a Rya storage
* if the Rya instance supports PCJ indexing.
*/
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/pom.xml
----------------------------------------------------------------------
diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml
index 7961b9f..16a205f 100644
--- a/extras/indexing/pom.xml
+++ b/extras/indexing/pom.xml
@@ -81,7 +81,10 @@
<groupId>org.apache.rya</groupId>
<artifactId>rya.pcj.fluo.api</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.service.api</artifactId>
+ </dependency>
<!-- OpenRDF -->
<dependency>
<groupId>org.openrdf.sesame</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java
new file mode 100644
index 0000000..26a25da
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.client.CreatePeriodicPCJ;
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.storage.PcjException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.sail.SailException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Class used by the RyaClient for creating Periodic PCJ.
+ *
+ */
+public class AccumuloCreatePeriodicPCJ extends AccumuloCommand implements CreatePeriodicPCJ {
+
+ private final GetInstanceDetails getInstanceDetails;
+
+ /**
+ * Constructs an instance of {@link AccumuloCreatePeriodicPCJ}.
+ *
+ * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+ * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null)
+ */
+ public AccumuloCreatePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+ super(connectionDetails, connector);
+ getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
+ }
+
+ @Override
+ public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException {
+ requireNonNull(instanceName);
+ requireNonNull(sparql);
+
+ final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName);
+ final boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
+ if (!ryaInstanceExists) {
+ throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
+ }
+
+ final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails();
+ final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
+ if (!pcjIndexingEnabeld) {
+ throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+ }
+
+ // If a Fluo application is being used, task it with updating the PCJ.
+ final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+ if (fluoDetailsHolder.isPresent()) {
+ final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+ try {
+ return updateFluoAppAndRegisterWithKafka(instanceName, fluoAppName, sparql, periodicTopic, bootStrapServers);
+ } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException
+ | RyaDAOException | PeriodicQueryCreationException e) {
+ throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
+ } catch (UnsupportedQueryException e) {
+ throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node "
+ + "or an invalid ExportStrategy for the given QueryType. Projection queries can be exported to either Rya or Kafka,"
+ + "unless they contain an aggregation, in which case they can only be exported to Kafka. Construct queries can be exported"
+ + "to Rya and Kafka, and Periodic queries can only be exported to Rya.");
+ }
+ } else {
+ throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+ }
+ }
+
+
+
+
+ private String updateFluoAppAndRegisterWithKafka(final String ryaInstance, final String fluoAppName, String sparql, String periodicTopic, String bootStrapServers) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, PeriodicQueryCreationException {
+ requireNonNull(sparql);
+ requireNonNull(periodicTopic);
+ requireNonNull(bootStrapServers);
+
+ final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
+
+ // Connect to the Fluo application that is updating this instance's PCJs.
+ final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
+ try(final FluoClient fluoClient = new FluoClientFactory().connect(
+ cd.getUsername(),
+ new String(cd.getPassword()),
+ cd.getInstanceName(),
+ cd.getZookeepers(),
+ fluoAppName);) {
+ // Initialize the PCJ within the Fluo application.
+ final CreatePeriodicQuery periodicPcj = new CreatePeriodicQuery(fluoClient, periodicStorage);
+ PeriodicNotificationClient periodicClient = new KafkaNotificationRegistrationClient(periodicTopic, createProducer(bootStrapServers));
+ return periodicPcj.withRyaIntegration(sparql, periodicClient, getConnector(), ryaInstance).getQueryId();
+ }
+ }
+
+
+ private static KafkaProducer<String, CommandNotification> createProducer(String bootStrapServers) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+ return new KafkaProducer<>(props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java
new file mode 100644
index 0000000..18e49dc
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.client.DeletePeriodicPCJ;
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.openrdf.query.MalformedQueryException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * Class used by the RyaClient and Rya Shell for deleting Periodic PCJ.
+ *
+ */
+public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
+ private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
+
+ private final GetInstanceDetails getInstanceDetails;
+
+ /**
+ * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
+ *
+ * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
+ * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
+ */
+ public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+ super(connectionDetails, connector);
+ getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
+ }
+
+ @Override
+ public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
+ requireNonNull(instanceName);
+ requireNonNull(pcjId);
+
+ final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
+ final boolean ryaInstanceExists = originalDetails.isPresent();
+ if(!ryaInstanceExists) {
+ throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
+ }
+
+ final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
+ if(!pcjIndexingEnabled) {
+ throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+ }
+
+ // If the PCJ was being maintained by a Fluo application, then stop that process.
+ final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
+ final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+
+ if (fluoDetailsHolder.isPresent()) {
+ final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
+ try {
+ stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
+ } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
+ throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
+ }
+ } else {
+ log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
+ + "missing for the Rya instance named '%s'.", instanceName));
+ }
+
+ }
+
+
+ private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
+ requireNonNull(fluoAppName);
+ requireNonNull(pcjId);
+
+ // Connect to the Fluo application that is updating this instance's PCJs.
+ final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
+ try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
+ cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
+ // Delete the PCJ from the Fluo App.
+ PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
+ DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
+ deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
+ }
+ }
+
+
+ private static PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
+ return new KafkaNotificationRegistrationClient(topic, createProducer(brokers));
+ }
+
+ private static KafkaProducer<String, CommandNotification> createProducer(String brokers) {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName());
+ return new KafkaProducer<>(props);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java
new file mode 100644
index 0000000..51e7d6a
--- /dev/null
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.List;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.api.client.GetInstanceDetails;
+import org.apache.rya.api.client.InstanceDoesNotExistException;
+import org.apache.rya.api.client.ListIncrementalQueries;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.instance.RyaDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
+import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
+import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+@DefaultAnnotation(NonNull.class)
+public class AccumuloListIncrementalQueries extends AccumuloCommand implements ListIncrementalQueries {
+
+ private final GetInstanceDetails getInstanceDetails;
+
+ public AccumuloListIncrementalQueries(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
+ super(connectionDetails, connector);
+ getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
+ }
+
+ @Override
+ public String listIncrementalQueries(String instanceName) throws RyaClientException {
+
+ requireNonNull(instanceName);
+
+ final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName);
+ final boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
+ if (!ryaInstanceExists) {
+ throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
+ }
+
+ final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails();
+ final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
+ if (!pcjIndexingEnabeld) {
+ throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
+ }
+
+ // If a Fluo application is being used, task it with updating the PCJ.
+ final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
+ if (fluoDetailsHolder.isPresent()) {
+ final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
+ try {
+ return getFluoQueryString(instanceName, fluoAppName);
+ } catch (Exception e) {
+ throw new RyaClientException("Problem while creating Fluo Query Strings.", e);
+ }
+ } else {
+ throw new RyaClientException(String.format("The '%s' instance of Rya does not have Fluo incremental updating enabled.", instanceName));
+ }
+ }
+
+
+ private String getFluoQueryString(final String ryaInstance, final String fluoAppName) throws Exception {
+
+ // Connect to the Fluo application that is updating this instance's PCJs.
+ final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
+ try(final FluoClient fluoClient = new FluoClientFactory().connect(
+ cd.getUsername(),
+ new String(cd.getPassword()),
+ cd.getInstanceName(),
+ cd.getZookeepers(),
+ fluoAppName);) {
+ // Initialize the PCJ within the Fluo application.
+ ListFluoQueries listQueries = new ListFluoQueries();
+ List<String> queries = listQueries.listFluoQueries(fluoClient);
+ return Joiner.on("\n").join(queries);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
----------------------------------------------------------------------
diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
index 5ee02f9..d9bf644 100644
--- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java
@@ -52,6 +52,9 @@ public class AccumuloRyaClientFactory {
new AccumuloInstall(connectionDetails, connector),
new AccumuloCreatePCJ(connectionDetails, connector),
new AccumuloDeletePCJ(connectionDetails, connector),
+ new AccumuloCreatePeriodicPCJ(connectionDetails, connector),
+ new AccumuloDeletePeriodicPCJ(connectionDetails, connector),
+ new AccumuloListIncrementalQueries(connectionDetails, connector),
new AccumuloBatchUpdatePCJ(connectionDetails, connector),
new AccumuloGetInstanceDetails(connectionDetails, connector),
new AccumuloInstanceExists(connectionDetails, connector),
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
index 758c481..16d33b2 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml
@@ -42,6 +42,10 @@ under the License.
<artifactId>rya.pcj.fluo.app</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.periodic.service.api</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.rya</groupId>
<artifactId>rya.sail</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
index 501f1f5..a988bc7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java
@@ -38,6 +38,7 @@ import org.apache.log4j.Logger;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine;
import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.RyaURI;
@@ -135,6 +136,7 @@ public class CreateFluoPcj {
* according to the Kafka {@link ExportStrategy}.
*
* @param sparql - sparql query String to be registered with Fluo
+ * @param strategies - ExportStrategies used to specify how final results will be handled
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @return The metadata that was written to the Fluo application for the PCJ.
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
@@ -218,7 +220,17 @@ public class CreateFluoPcj {
.setSparql(sparql)
.setJoinBatchSize(joinBatchSize);
- return builder.build();
+ FluoQuery query = builder.build();
+
+ if(query.getQueryType() == QueryType.PERIODIC && !Sets.newHashSet(ExportStrategy.PERIODIC).containsAll(strategies)) {
+ throw new UnsupportedQueryException("Periodic Queries must only utilize the PeriodicExport or the NoOpExport ExportStrategy.");
+ }
+
+ if(query.getQueryType() != QueryType.PERIODIC && strategies.contains(ExportStrategy.PERIODIC)) {
+ throw new UnsupportedQueryException("Only Periodic Queries can utilize the PeriodicExport ExportStrategy.");
+ }
+
+ return query;
}
private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) {
@@ -283,13 +295,13 @@ public class CreateFluoPcj {
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @param accumulo - Accumulo connector for connecting with Accumulo
* @param ryaInstance - name of Rya instance to connect to
- * @return The Fluo application's Query ID of the query that was created.
+ * @return FluoQuery containing the metadata for the newly registered SPARQL query
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
* @throws UnsupportedQueryException
*/
- public String withRyaIntegration(
+ public FluoQuery withRyaIntegration(
final String pcjId,
final String sparql,
final Set<ExportStrategy> strategies,
@@ -308,7 +320,7 @@ public class CreateFluoPcj {
//import results already ingested into Rya that match query
importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance);
// return queryId to the caller for later monitoring from the export.
- return fluoQuery.getQueryMetadata().getNodeId();
+ return fluoQuery;
}
/**
@@ -326,13 +338,13 @@ public class CreateFluoPcj {
* @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
* @param accumulo - Accumuo connector for connecting to Accumulo
* @param ryaInstance - name of Rya instance to connect to
- * @return The Fluo application's Query ID of the query that was created.
+ * @return FluoQuery containing the metadata for the newly registered SPARQL query
* @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed.
* @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}.
* @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}.
* @throws UnsupportedQueryException
*/
- public String withRyaIntegration(
+ public FluoQuery withRyaIntegration(
final String pcjId,
final PrecomputedJoinStorage pcjStorage,
final FluoClient fluo,
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java
new file mode 100644
index 0000000..24adde9
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.algebra.evaluation.function.Function;
+
+import com.google.common.collect.Sets;
+
+
+/**
+ * Object that creates a Periodic Query. A Periodic Query is any query
+ * requesting periodic updates about events that occurred within a given
+ * window of time of this instant. This is also known as a rolling window
+ * query. Period Queries can be expressed using SPARQL by including the
+ * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
+ * in the query. The user must provide this Function with the following arguments:
+ * the temporal variable in the query that will be filtered on, the window of time
+ * that events must occur within, the period at which the user wants to receive updates,
+ * and the time unit. The following query requests all observations that occurred
+ * within the last minute and requests updates every 15 seconds. It also performs
+ * a count on those observations.
+ * <p>
+ * <pre>
+ * prefix function: http://org.apache.rya/function#
+ * "prefix time: http://www.w3.org/2006/time#
+ * "select (count(?obs) as ?total) where {
+ * "Filter(function:periodic(?time, 1, .25, time:minutes))
+ * "?obs uri:hasTime ?time.
+ * "?obs uri:hasId ?id }
+ * </pre>
+ * <p>
+ * This class is responsible for taking a Periodic Query expressed as a SPARQL query
+ * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}.
+ */
+public class CreatePeriodicQuery {
+
+ private FluoClient fluoClient;
+ private PeriodicQueryResultStorage periodicStorage;
+
+
+ /**
+ * Constructs an instance of CreatePeriodicQuery for creating periodic queries. An instance
+ * of CreatePeriodicQuery that is created using this constructor will not publish new PeriodicNotifications
+ * to Kafka.
+ *
+ * @param fluoClient - Fluo client for interacting with Fluo
+ * @param periodicStorage - PeriodicQueryResultStorage storing periodic query results
+ */
+ public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) {
+ this.fluoClient = fluoClient;
+ this.periodicStorage = periodicStorage;
+ }
+
+
+ /**
+ * Creates a Periodic Query by adding the query to Fluo and using the resulting
+ * Fluo id to create a {@link PeriodicQueryResultStorage} table.
+ *
+ * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+ * @return FluoQuery indicating the metadata of the registered SPARQL query
+ */
+ public FluoQuery createPeriodicQuery(String sparql) throws PeriodicQueryCreationException {
+ try {
+ Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+ if(optNode.isPresent()) {
+ String pcjId = FluoQueryUtils.createNewPcjId();
+
+ //register query with Fluo
+ CreateFluoPcj createPcj = new CreateFluoPcj();
+ FluoQuery fluoQuery = createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluoClient);
+
+ //register query with PeriodicResultStorage table
+ periodicStorage.createPeriodicQuery(pcjId, sparql);
+
+ return fluoQuery;
+ } else {
+ throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter.");
+ }
+ } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
+ throw new PeriodicQueryCreationException(e);
+ }
+ }
+
+
+ /**
+ * Creates a Periodic Query by adding the query to Fluo and using the resulting
+ * Fluo id to create a {@link PeriodicQueryResultStorage} table. Additionally,
+ * the associated PeriodicNotification is registered with the Periodic Query Service.
+ *
+ * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+ * @param notificationClient - {@link PeriodicNotificationClient} for registering new PeriodicNotifications
+ * @return FluoQuery indicating the metadata of the registered SPARQL query
+ */
+ public FluoQuery createPeriodicQuery(String sparql, PeriodicNotificationClient notificationClient) throws PeriodicQueryCreationException {
+ try {
+ Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+ if(optNode.isPresent()) {
+ PeriodicQueryNode periodicNode = optNode.get();
+ String pcjId = FluoQueryUtils.createNewPcjId();
+
+ //register query with Fluo
+ CreateFluoPcj createPcj = new CreateFluoPcj();
+ FluoQuery fluoQuery = createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluoClient);
+
+ //register query with PeriodicResultStorage table
+ periodicStorage.createPeriodicQuery(pcjId, sparql);
+ //create notification
+ PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
+ .timeUnit(periodicNode.getUnit()).build();
+ //register notification with periodic notification app
+ notificationClient.addNotification(notification);
+
+ return fluoQuery;
+ } else {
+ throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter.");
+ }
+ } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) {
+ throw new PeriodicQueryCreationException(e);
+ }
+ }
+
+
+ /**
+ * Creates a Periodic Query by adding the query to Fluo and using the resulting
+ * Fluo id to create a {@link PeriodicQueryResultStorage} table.
+ * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table
+ * @param notificationClient - {@link PeriodicNotificationClient} for registering new PeriodicNotifications
+ * @param conn - Accumulo connector for connecting to the Rya instance
+ * @param ryaInstance - name of the Accumulo back Rya instance
+ * @return FluoQuery indicating the metadata of the registered SPARQL query
+ */
+ public FluoQuery withRyaIntegration(String sparql, PeriodicNotificationClient notificationClient, Connector conn, String ryaInstance)
+ throws PeriodicQueryCreationException {
+ try {
+ Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql);
+ if (optNode.isPresent()) {
+ PeriodicQueryNode periodicNode = optNode.get();
+ String pcjId = FluoQueryUtils.createNewPcjId();
+
+ // register query with Fluo
+ CreateFluoPcj createPcj = new CreateFluoPcj();
+ FluoQuery fluoQuery = createPcj.withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC),
+ fluoClient, conn, ryaInstance);
+
+ // register query with PeriodicResultStorage table
+ periodicStorage.createPeriodicQuery(pcjId, sparql);
+ // create notification
+ PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod())
+ .timeUnit(periodicNode.getUnit()).build();
+ // register notification with periodic notification app
+ notificationClient.addNotification(notification);
+
+ return fluoQuery;
+ } else {
+ throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter.");
+ }
+ } catch (Exception e) {
+ throw new PeriodicQueryCreationException(e);
+ }
+ }
+
+ /**
+ * This Exception gets thrown whenever there is an issue creating a PeriodicQuery.
+ *
+ */
+ public static class PeriodicQueryCreationException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public PeriodicQueryCreationException(Exception e) {
+ super(e);
+ }
+
+ public PeriodicQueryCreationException(String message, Exception e) {
+ super(message, e);
+ }
+
+ public PeriodicQueryCreationException(String message) {
+ super(message);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java
new file mode 100644
index 0000000..4ff88da
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.common.base.Preconditions;
+
+public class DeletePeriodicQuery {
+
+ private FluoClient fluo;
+ private PeriodicQueryResultStorage periodicStorage;
+
+ public DeletePeriodicQuery(FluoClient fluo, PeriodicQueryResultStorage periodicStorage) {
+ this.fluo = fluo;
+ this.periodicStorage = periodicStorage;
+ }
+
+ /**
+ * Deletes the Periodic Query with the indicated pcjId from Fluo and {@link PeriodicQueryResultStorage}.
+ * @param pcjId - Id of the Periodic Query to be deleted
+ */
+ public void deletePeriodicQuery(String pcjId) throws QueryDeletionException {
+
+ Preconditions.checkNotNull(pcjId);
+
+ DeleteFluoPcj deletePcj = new DeleteFluoPcj(1000);
+ try {
+ deletePcj.deletePcj(fluo, pcjId);
+ periodicStorage.deletePeriodicQuery(pcjId);
+ } catch (UnsupportedQueryException | PeriodicQueryStorageException e) {
+ throw new QueryDeletionException(String.format("Unable to delete the Periodic Query with Id: %s", pcjId), e);
+ }
+
+ }
+
+ /**
+ * Deletes the Periodic Query with the indicated pcjId from Fluo and {@link PeriodicQueryResultStorage}. In
+ * addition, this method also informs the Periodic Notification Service to stop generating PeriodicNotifications
+ * associated with the Periodic Query.
+ *
+ * @param queryId - Id of the Periodic Query to be deleted
+ * @param periodicClient - Client used to inform the Periodic Notification Service to stop generating notifications
+ * @throws QueryDeletionException
+ */
+ public void deletePeriodicQuery(String pcjId, PeriodicNotificationClient periodicClient) throws QueryDeletionException {
+
+ Preconditions.checkNotNull(periodicClient);
+
+ deletePeriodicQuery(pcjId);
+ periodicClient.deleteNotification(new BasicNotification(pcjId));
+ }
+
+ /**
+ * This Exception is thrown when a problem is encountered while deleting a
+ * query from the Fluo Application or the underlying storage layer.
+ */
+ public static class QueryDeletionException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+
+ public QueryDeletionException(String message) {
+ super(message);
+ }
+
+ public QueryDeletionException(String message, Exception e) {
+ super(message, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java
new file mode 100644
index 0000000..8f5bbfe
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Class for retrieving a List containing a String representation of each query maintained by Fluo.
+ *
+ */
+public class ListFluoQueries {
+
+ private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
+ /**
+ * Retrieve a list of String representations of each query maintained by Fluo
+ *
+ * @param fluo - FluoClient for interacting with Fluo
+ * @return - List of String representations of queries maintained by Fluo.
+ * @throws Exception
+ */
+ public List<String> listFluoQueries(FluoClient fluo) throws Exception {
+
+ List<String> queryStrings = new ArrayList<>();
+ Snapshot sx = fluo.newSnapshot();
+
+ List<String> ids = new ListQueryIds().listQueryIds(fluo);
+ for (String id : ids) {
+ queryStrings.add(extractString(dao.readQueryMetadata(sx, id)));
+ }
+
+ return queryStrings;
+ }
+
+ private static String extractString(QueryMetadata metadata) throws Exception {
+ FluoQueryStringBuilder builder = new FluoQueryStringBuilder();
+ return builder.setQueryId(metadata.getNodeId()).setQueryType(metadata.getQueryType())
+ .setExportStrategies(metadata.getExportStrategies()).setQuery(metadata.getSparql()).build();
+ }
+
+ private static String getPrettyPrintSparql(String sparql, int indent) throws Exception {
+ SPARQLParser parser = new SPARQLParser();
+ ParsedQuery pq = parser.parseQuery(sparql, null);
+ SPARQLQueryRenderer render = new SPARQLQueryRenderer();
+ String renderedQuery = render.render(pq);
+
+ //remove extra quotes generated by query renderer
+ String[] splitRender = renderedQuery.split("\"\"\"");
+ StringBuilder builder = new StringBuilder();
+ for(String s: splitRender) {
+ builder.append(s).append("\"");
+ }
+ builder.replace(builder.length() - 1, builder.length(), "");
+
+ //add indent to all lines following newline char
+ String[] newLineRender = builder.toString().split("\n");
+ builder = new StringBuilder();
+ String prefix = getVariableIndent(indent);
+ for(int i = 0; i < newLineRender.length; i++) {
+ if(i != 0) {
+ builder.append(prefix);
+ }
+ builder.append(newLineRender[i]).append("\n");
+ }
+
+ return builder.toString();
+ }
+
+ private static String getVariableIndent(int len) {
+ return new String(new char[len]).replace('\0', ' ');
+ }
+
+ public static class FluoQueryStringBuilder {
+
+ private String queryId;
+ private String sparql;
+ private QueryType queryType;
+ private Set<ExportStrategy> strategies;
+
+ public FluoQueryStringBuilder setQueryId(String queryId) {
+ this.queryId = Preconditions.checkNotNull(queryId);
+ return this;
+ }
+
+ public FluoQueryStringBuilder setQuery(String query) {
+ this.sparql = Preconditions.checkNotNull(query);
+ return this;
+ }
+
+ public FluoQueryStringBuilder setExportStrategies(Set<ExportStrategy> strategies) {
+ this.strategies = Preconditions.checkNotNull(strategies);
+ return this;
+ }
+
+ public FluoQueryStringBuilder setQueryType(QueryType queryType) {
+ this.queryType = Preconditions.checkNotNull(queryType);
+ return this;
+ }
+
+ public String build() throws Exception {
+
+ int valueAlign = 20;
+ String sparqlHeader = "SPARQL: ";
+ String idHeader = "QUERY ID: ";
+ String typeHeader = "QUERY TYPE: ";
+ String strategiesHeader = "EXPORT STRATEGIES: ";
+
+ StringBuilder builder = new StringBuilder();
+ builder.append(idHeader).append(getVariableIndent(valueAlign - idHeader.length())).append(queryId).append("\n")
+ .append(typeHeader).append(getVariableIndent(valueAlign - typeHeader.length())).append(queryType).append("\n")
+ .append(strategiesHeader).append(getVariableIndent(valueAlign - strategiesHeader.length())).append(strategies).append("\n")
+ .append(sparqlHeader).append(getVariableIndent(valueAlign - sparqlHeader.length())).append(getPrettyPrintSparql(sparql, valueAlign)).append("\n");
+
+ return builder.toString();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
index c090d37..5405837 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java
@@ -39,6 +39,7 @@ public class IncrementalUpdateConstants {
public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
+ //binding name reserved for periodic bin id for periodic query results
public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
index 62f1271..2cb7eff 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java
@@ -30,7 +30,6 @@ import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
@@ -95,16 +94,21 @@ public class ExporterManager implements AutoCloseable {
* @throws ResultExportException
*/
private void exportBindingSet(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException {
+ VisibilityBindingSet bs;
try {
- VisibilityBindingSet bs = BS_SERDE.deserialize(data);
+ bs = BS_SERDE.deserialize(data);
simplifyVisibilities(bs);
+ } catch (Exception e) {
+ throw new ResultExportException("Unable to deserialize the given BindingSet.", e);
+ }
+ try{
for(ExportStrategy strategy: strategies) {
IncrementalBindingSetExporter exporter = (IncrementalBindingSetExporter) exporters.get(strategy);
exporter.export(pcjId, bs);
}
} catch (Exception e) {
- throw new ResultExportException("Unable to deserialize the provided BindingSet", e);
+ throw new ResultExportException("Unable to export the given BindingSet " + bs + " with the given set of ExportStrategies " + strategies, e);
}
}
@@ -125,9 +129,14 @@ public class ExporterManager implements AutoCloseable {
throw new ResultExportException("Undable to deserialize provided RyaSubgraph", e);
}
- for(ExportStrategy strategy: strategies) {
- IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy);
- exporter.export(pcjId, subGraph);
+ try {
+ for (ExportStrategy strategy : strategies) {
+ IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy);
+ exporter.export(pcjId, subGraph);
+ }
+ } catch (Exception e) {
+ throw new ResultExportException(
+ "Unable to export the given subgraph " + subGraph + " using all of the ExportStrategies " + strategies);
}
}
@@ -195,8 +204,6 @@ public class ExporterManager implements AutoCloseable {
* @return - ExporterManager for managing IncrementalResultExporters and exporting results
*/
public ExporterManager build() {
- //adds NoOpExporter in the event that users does not want to Export results
- addIncrementalResultExporter(new NoOpExporter());
return new ExporterManager(exporters);
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
deleted file mode 100644
index ab7f2ed..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export;
-
-import java.util.Set;
-
-import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
-import org.apache.rya.api.client.CreatePCJ.QueryType;
-import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import com.google.common.collect.Sets;
-
-/**
- * This class is a NoOpExporter that can be specified if a user does not
- * want their results exported from Fluo.
- *
- */
-public class NoOpExporter implements IncrementalBindingSetExporter, IncrementalRyaSubGraphExporter {
-
- @Override
- public Set<QueryType> getQueryTypes() {
- return Sets.newHashSet(QueryType.CONSTRUCT, QueryType.PROJECTION);
- }
-
- @Override
- public ExportStrategy getExportStrategy() {
- return ExportStrategy.NO_OP_EXPORT;
- }
-
- @Override
- public void close() throws Exception {
- }
-
- @Override
- public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
- }
-
- @Override
- public void export(String queryId, VisibilityBindingSet result) throws ResultExportException {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
index 4550a50..3687c9f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
import com.google.common.base.Preconditions;
-
public class KafkaBindingSetExporterParameters extends KafkaExportParameterBase {
public static final String CONF_USE_KAFKA_BINDING_SET_EXPORTER = "pcj.fluo.export.kafka.bindingset.enabled";
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
index 604462b..5a8f01c 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
@@ -52,7 +52,7 @@ public class PeriodicBindingSetExporter implements IncrementalBindingSetExporter
@Override
public ExportStrategy getExportStrategy() {
- return ExportStrategy.RYA;
+ return ExportStrategy.PERIODIC;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index ba7beee..e07c514 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -19,6 +19,7 @@
package org.apache.rya.indexing.pcj.fluo.app.observers;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
+import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
@@ -35,7 +36,6 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporte
import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
@@ -66,7 +66,7 @@ public class QueryResultObserver extends AbstractObserver {
@Override
public ObservedColumn getObservedColumn() {
- return new ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, NotificationType.STRONG);
+ return new ObservedColumn(QUERY_BINDING_SET, NotificationType.STRONG);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 17ab14f..a1c7c00 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -92,11 +92,7 @@ public class FluoQuery {
this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
this.filterMetadata = requireNonNull(filterMetadata);
this.joinMetadata = requireNonNull(joinMetadata);
- if(constructMetadata.isPresent()) {
- this.type = QueryType.CONSTRUCT;
- } else {
- this.type = QueryType.PROJECTION;
- }
+ this.type = queryMetadata.getQueryType();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
index 8569a48..6ca0e8d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.fluo.api.data.Column;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
index d5d9fe7..1cf2825 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java
@@ -113,8 +113,10 @@ public class FluoQueryMetadataDAO {
final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM);
Set<ExportStrategy> strategies = new HashSet<>();
- for(String strategy: exportStrategies) {
- strategies.add(ExportStrategy.valueOf(strategy));
+ for (String strategy : exportStrategies) {
+ if (!strategy.isEmpty()) {
+ strategies.add(ExportStrategy.valueOf(strategy));
+ }
}
return QueryMetadata.builder(nodeId)