You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/06/22 18:05:15 UTC
[3/4] incubator-rya git commit: RYA-273-Construct Query Support.
Closes #161.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
new file mode 100644
index 0000000..797502c
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java
@@ -0,0 +1,39 @@
+package org.apache.rya.indexing.pcj.fluo.app.export;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+
+/**
+ * Incrementally exports {@link RyaSubGraph}s that are generated by SPARQL Construct Queries
+ * from the Rya-Fluo application to the core Rya tables.
+ *
+ */
+public interface IncrementalRyaSubGraphExporter extends AutoCloseable {
+
+ /**
+ * Export a RyaSubGraph that is the result of SPARQL Construct Query.
+ *
+ * @param constructID - The Fluo Id of the construct query the created the RyaSubGraph
+ * @param subgraph - The RyaSubGraph to export (non-null)
+ * @throws ResultExportException The result could not be exported.
+ */
+ public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..ecbec09
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java
@@ -0,0 +1,47 @@
+package org.apache.rya.indexing.pcj.fluo.app.export;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+
+import com.google.common.base.Optional;
+
+/**
+ * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided
+ * configurations.
+ */
+public interface IncrementalRyaSubGraphExporterFactory {
+
+ /**
+ * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the
+ * configurations that are provided.
+ *
+ * @param context - Contains the host application's configuration values
+ * and any parameters that were provided at initialization. (not null)
+ * @return An exporter if configurations were found in the context; otherwise absent.
+ * @throws IncrementalExporterFactoryException A non-configuration related
+ * problem has occurred and the exporter could not be created as a result.
+ * @throws ConfigurationException Thrown if configuration values were
+ * provided, but an instance of the exporter could not be initialized
+ * using them. This could be because they were improperly formatted,
+ * a required field was missing, or some other configuration based problem.
+ */
+ public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
new file mode 100644
index 0000000..152d156
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Kafka topics.
+ */
+public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
+ private static final Logger log = Logger.getLogger(KafkaBindingSetExporter.class);
+
+ private final KafkaProducer<String, VisibilityBindingSet> producer;
+
+ /**
+ * Constructs an instance given a Kafka producer.
+ *
+ * @param producer
+ * for sending result set alerts to a broker. (not null)
+ * Can be created and configured by {@link KafkaBindingSetExporterFactory}
+ */
+ public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
+ super();
+ checkNotNull(producer, "Producer is required.");
+ this.producer = producer;
+ }
+
+ /**
+ * Send the results to the topic using the queryID as the topicname
+ */
+ @Override
+ public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
+ checkNotNull(fluoTx);
+ checkNotNull(queryId);
+ checkNotNull(result);
+ try {
+ final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+ final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
+ log.trace(msg);
+
+ // Send the result to the topic whose name matches the PCJ ID.
+ final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
+ final Future<RecordMetadata> future = producer.send(rec);
+
+ // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
+ future.get();
+
+ log.debug("producer.send(rec) completed");
+
+ } catch (final Throwable e) {
+ throw new ResultExportException("A result could not be exported to Kafka.", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ producer.close(5, TimeUnit.SECONDS);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
new file mode 100644
index 0000000..5507037
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+/**
+ * Creates instances of {@link KafkaBindingSetExporter}.
+ * <p/>
+ * Configure a Kafka producer by adding several required Key/values as described here:
+ * http://kafka.apache.org/documentation.html#producerconfigs
+ * <p/>
+ * Here is a simple example:
+ * <pre>
+ * Properties producerConfig = new Properties();
+ * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ * </pre>
+ *
+ * @see ProducerConfig
+ */
+public class KafkaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+ private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class);
+ @Override
+ public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+ log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+ if (exportParams.isExportToKafka()) {
+ // Setup Kafka connection
+ KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
+ // Create the exporter
+ final IncrementalBindingSetExporter exporter = new KafkaBindingSetExporter(producer);
+ return Optional.of(exporter);
+ } else {
+ return Optional.absent();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
deleted file mode 100644
index 72ec947..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporter.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Kafka topics.
- */
-public class KafkaResultExporter implements IncrementalResultExporter {
- private static final Logger log = Logger.getLogger(KafkaResultExporter.class);
-
- private final KafkaProducer<String, VisibilityBindingSet> producer;
-
- /**
- * Constructs an instance given a Kafka producer.
- *
- * @param producer
- * for sending result set alerts to a broker. (not null)
- * Can be created and configured by {@link KafkaResultExporterFactory}
- */
- public KafkaResultExporter(final KafkaProducer<String, VisibilityBindingSet> producer) {
- super();
- checkNotNull(producer, "Producer is required.");
- this.producer = producer;
- }
-
- /**
- * Send the results to the topic using the queryID as the topicname
- */
- @Override
- public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
- checkNotNull(fluoTx);
- checkNotNull(queryId);
- checkNotNull(result);
- try {
- final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
- final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
- log.trace(msg);
-
- // Send the result to the topic whose name matches the PCJ ID.
- final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result);
- final Future<RecordMetadata> future = producer.send(rec);
-
- // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
- future.get();
-
- log.debug("producer.send(rec) completed");
-
- } catch (final Throwable e) {
- throw new ResultExportException("A result could not be exported to Kafka.", e);
- }
- }
-
- @Override
- public void close() throws Exception {
- producer.close(5, TimeUnit.SECONDS);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
deleted file mode 100644
index 995e9d9..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaResultExporterFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import org.apache.fluo.api.observer.Observer.Context;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.log4j.Logger;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-import com.google.common.base.Optional;
-
-/**
- * Creates instances of {@link KafkaResultExporter}.
- * <p/>
- * Configure a Kafka producer by adding several required Key/values as described here:
- * http://kafka.apache.org/documentation.html#producerconfigs
- * <p/>
- * Here is a simple example:
- * <pre>
- * Properties producerConfig = new Properties();
- * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- * </pre>
- *
- * @see ProducerConfig
- */
-public class KafkaResultExporterFactory implements IncrementalResultExporterFactory {
- private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class);
- @Override
- public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
- final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
- log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
- if (exportParams.isExportToKafka()) {
- // Setup Kafka connection
- KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
- // Create the exporter
- final IncrementalResultExporter exporter = new KafkaResultExporter(producer);
- return Optional.of(exporter);
- } else {
- return Optional.absent();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
new file mode 100644
index 0000000..a15743f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -0,0 +1,81 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application
+ *
+ */
+public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
+
+ private final KafkaProducer<String, RyaSubGraph> producer;
+ private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporter.class);
+
+ public KafkaRyaSubGraphExporter(KafkaProducer<String, RyaSubGraph> producer) {
+ checkNotNull(producer);
+ this.producer = producer;
+ }
+
+ /**
+ * Exports the RyaSubGraph to a Kafka topic equivalent to the result returned by {@link RyaSubGraph#getId()}
+ * @param subgraph - RyaSubGraph exported to Kafka
+ * @param contructID - rowID of result that is exported. Used for logging purposes.
+ */
+ @Override
+ public void export(String constructID, RyaSubGraph subGraph) throws ResultExportException {
+ checkNotNull(constructID);
+ checkNotNull(subGraph);
+ try {
+ // Send the result to the topic whose name matches the PCJ ID.
+ final ProducerRecord<String, RyaSubGraph> rec = new ProducerRecord<>(subGraph.getId(), subGraph);
+ final Future<RecordMetadata> future = producer.send(rec);
+
+ // Don't let the export return until the result has been written to the topic. Otherwise we may lose results.
+ future.get();
+
+ log.debug("Producer successfully sent record with id: " + constructID + " and statements: " + subGraph.getStatements());
+
+ } catch (final Throwable e) {
+ throw new ResultExportException("A result could not be exported to Kafka.", e);
+ }
+ }
+
+ /**
+ * Closes exporter.
+ */
+ @Override
+ public void close() throws Exception {
+ producer.close(5, TimeUnit.SECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..2c1e4c0
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
@@ -0,0 +1,62 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * Factory for creating {@link KafkaRyaSubGraphExporter}s that are used for
+ * exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka.
+ *
+ */
+public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphExporterFactory {
+
+ private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class);
+
+ /**
+ * Builds a {@link KafkaRyaSubGraphExporter}.
+ * @param context - {@link Context} object used to pass configuration parameters
+ * @return an Optional consisting of an IncrementalSubGraphExproter if it can be constructed
+ * @throws IncrementalExporterFactoryException
+ * @throws ConfigurationException
+ */
+ @Override
+ public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+ log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+ if (exportParams.isExportToKafka()) {
+ // Setup Kafka connection
+ KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
+ // Create the exporter
+ final IncrementalRyaSubGraphExporter exporter = new KafkaRyaSubGraphExporter(producer);
+ return Optional.of(exporter);
+ } else {
+ return Optional.absent();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
new file mode 100644
index 0000000..ed20e8a
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/RyaSubGraphKafkaSerDe.java
@@ -0,0 +1,100 @@
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.domain.serialization.kryo.RyaSubGraphSerializer;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for {@link RyaSubGraph}s.
+ *
+ */
+public class RyaSubGraphKafkaSerDe implements Serializer<RyaSubGraph>, Deserializer<RyaSubGraph> {
+
+ private final Kryo kryo;
+
+ public RyaSubGraphKafkaSerDe() {
+ kryo = new Kryo();
+ kryo.register(RyaSubGraph.class,new RyaSubGraphSerializer());
+ }
+
+ /**
+ * Deserializes from bytes to a RyaSubGraph
+ * @param bundleBytes - byte representation of RyaSubGraph
+ * @return - Deserialized RyaSubGraph
+ */
+ public RyaSubGraph fromBytes(byte[] bundleBytes) {
+ return kryo.readObject(new Input(bundleBytes), RyaSubGraph.class);
+ }
+
+ /**
+ * Serializes RyaSubGraph to bytes
+ * @param bundle - RyaSubGraph to be serialized
+ * @return - serialized bytes from RyaSubGraph
+ */
+ public byte[] toBytes(RyaSubGraph bundle) {
+ Output output = new Output(new ByteArrayOutputStream());
+ kryo.writeObject(output, bundle, new RyaSubGraphSerializer());
+ return output.getBuffer();
+ }
+
+ /**
+ * Deserializes RyaSubGraph
+ * @param topic - topic that data is associated with (no effect)
+ * @param bundleBytes - bytes to be deserialized
+ * @return - deserialized RyaSubGraph
+ */
+ @Override
+ public RyaSubGraph deserialize(String topic, byte[] bundleBytes) {
+ return fromBytes(bundleBytes);
+ }
+
+ /**
+ * Serializes RyaSubGraph
+ * @param subgraph - subgraph to be serialized
+ * @param topic - topic that data is associated with
+ * @return - serialized bytes from subgraph
+ */
+ @Override
+ public byte[] serialize(String topic, RyaSubGraph subgraph) {
+ return toBytes(subgraph);
+ }
+
+ /**
+ * Closes serializer (no effect)
+ */
+ @Override
+ public void close() {
+ }
+
+ /**
+ * Configures serializer (no effect)
+ */
+ @Override
+ public void configure(Map<String, ?> arg0, boolean arg1) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
new file mode 100644
index 0000000..84d3ce6
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collections;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
+ */
+public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
+
+ private final PrecomputedJoinStorage pcjStorage;
+
+ /**
+ * Constructs an instance of {@link RyaBindingSetExporter}.
+ *
+ * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
+ */
+ public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
+ this.pcjStorage = checkNotNull(pcjStorage);
+ }
+
+ @Override
+ public void export(
+ final TransactionBase fluoTx,
+ final String queryId,
+ final VisibilityBindingSet result) throws ResultExportException {
+ requireNonNull(fluoTx);
+ requireNonNull(queryId);
+ requireNonNull(result);
+
+ // Look up the ID the PCJ represents within the PCJ Storage.
+ final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+
+ try {
+ pcjStorage.addResults(pcjId, Collections.singleton(result));
+ } catch (final PCJStorageException e) {
+ throw new ResultExportException("A result could not be exported to Rya.", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ pcjStorage.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
new file mode 100644
index 0000000..86d593f
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+
+import com.google.common.base.Optional;
+
+import org.apache.fluo.api.observer.Observer.Context;
+
+/**
+ * Creates instances of {@link RyaBindingSetExporter}.
+ */
+public class RyaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+
+ @Override
+ public Optional<IncrementalBindingSetExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ checkNotNull(context);
+
+ // Wrap the context's parameters for parsing.
+ final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
+
+ if(params.isExportToRya()) {
+ // Setup Zookeeper connection info.
+ final String accumuloInstance = params.getAccumuloInstanceName().get();
+ final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ",");
+ final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
+
+ try {
+ // Setup Accumulo connection info.
+ final String exporterUsername = params.getExporterUsername().get();
+ final String exporterPassword = params.getExporterPassword().get();
+ final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
+
+ // Setup Rya PCJ Storage.
+ final String ryaInstanceName = params.getRyaInstanceName().get();
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
+
+ // Make the exporter.
+ final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage);
+ return Optional.of(exporter);
+
+ } catch (final AccumuloException | AccumuloSecurityException e) {
+ throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
+ }
+ } else {
+ return Optional.absent();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
index cba6a43..a1ba5b8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
@@ -45,6 +45,7 @@ public class RyaExportParameters extends ParametersBase {
public static final String CONF_EXPORTER_PASSWORD = "pcj.fluo.export.rya.exporterPassword";
public static final String CONF_RYA_INSTANCE_NAME = "pcj.fluo.export.rya.ryaInstanceName";
+ public static final String CONF_FLUO_APP_NAME = "pcj.fluo.export.rya.fluo.application.name";
/**
* Constructs an instance of {@link RyaExportParameters}.
@@ -147,4 +148,18 @@ public class RyaExportParameters extends ParametersBase {
public Optional<String> getExporterPassword() {
return Optional.fromNullable( params.get(CONF_EXPORTER_PASSWORD) );
}
+
+ /**
+ * @param fluoApplicationName - The name of the Rya Fluo application
+ */
+ public void setFluoApplicationName(@Nullable final String fluoApplicationName) {
+ params.put(CONF_FLUO_APP_NAME, fluoApplicationName);
+ }
+
+ /**
+ * @return The name of the Rya Fluo application
+ */
+ public Optional<String> getFluoApplicationName() {
+ return Optional.fromNullable(params.get(CONF_FLUO_APP_NAME));
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
deleted file mode 100644
index b8b3c45..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporter.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.Objects.requireNonNull;
-
-import java.util.Collections;
-
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-
-/**
- * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
- */
-public class RyaResultExporter implements IncrementalResultExporter {
-
- private final PrecomputedJoinStorage pcjStorage;
-
- /**
- * Constructs an instance of {@link RyaResultExporter}.
- *
- * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
- */
- public RyaResultExporter(final PrecomputedJoinStorage pcjStorage) {
- this.pcjStorage = checkNotNull(pcjStorage);
- }
-
- @Override
- public void export(
- final TransactionBase fluoTx,
- final String queryId,
- final VisibilityBindingSet result) throws ResultExportException {
- requireNonNull(fluoTx);
- requireNonNull(queryId);
- requireNonNull(result);
-
- // Look up the ID the PCJ represents within the PCJ Storage.
- final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-
- try {
- pcjStorage.addResults(pcjId, Collections.singleton(result));
- } catch (final PCJStorageException e) {
- throw new ResultExportException("A result could not be exported to Rya.", e);
- }
- }
-
- @Override
- public void close() throws Exception {
- pcjStorage.close();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
deleted file mode 100644
index c695272..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaResultExporterFactory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.rya;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.accumulo.core.client.AccumuloException;
-import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.ZooKeeperInstance;
-import org.apache.accumulo.core.client.security.tokens.PasswordToken;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
-import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-
-import com.google.common.base.Optional;
-
-import org.apache.fluo.api.observer.Observer.Context;
-
-/**
- * Creates instances of {@link RyaResultExporter}.
- */
-public class RyaResultExporterFactory implements IncrementalResultExporterFactory {
-
- @Override
- public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
- checkNotNull(context);
-
- // Wrap the context's parameters for parsing.
- final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
-
- if(params.isExportToRya()) {
- // Setup Zookeeper connection info.
- final String accumuloInstance = params.getAccumuloInstanceName().get();
- final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ",");
- final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
-
- try {
- // Setup Accumulo connection info.
- final String exporterUsername = params.getExporterUsername().get();
- final String exporterPassword = params.getExporterPassword().get();
- final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
-
- // Setup Rya PCJ Storage.
- final String ryaInstanceName = params.getRyaInstanceName().get();
- final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-
- // Make the exporter.
- final IncrementalResultExporter exporter = new RyaResultExporter(pcjStorage);
- return Optional.of(exporter);
-
- } catch (final AccumuloException | AccumuloSecurityException e) {
- throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
- }
- } else {
- return Optional.absent();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index dc4b3b4..ac131e3 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -26,11 +26,13 @@ import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructQueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.FilterResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.QueryResultUpdater;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
+import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
@@ -57,6 +59,7 @@ public abstract class BindingSetUpdater extends AbstractObserver {
private final FilterResultUpdater filterUpdater = new FilterResultUpdater();
private final QueryResultUpdater queryUpdater = new QueryResultUpdater();
private final AggregationResultUpdater aggregationUpdater = new AggregationResultUpdater();
+ private final ConstructQueryResultUpdater constructUpdater = new ConstructQueryResultUpdater();
@Override
public abstract ObservedColumn getObservedColumn();
@@ -102,6 +105,15 @@ public abstract class BindingSetUpdater extends AbstractObserver {
}
break;
+ case CONSTRUCT:
+ final ConstructQueryMetadata constructQuery = queryDao.readConstructQueryMetadata(tx, parentNodeId);
+ try{
+ constructUpdater.updateConstructQueryResults(tx, observedBindingSet, constructQuery);
+ } catch (final Exception e) {
+ throw new RuntimeException("Could not process a Query node.", e);
+ }
+ break;
+
case FILTER:
final FilterMetadata parentFilter = queryDao.readFilterMetadata(tx, parentNodeId);
try {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
new file mode 100644
index 0000000..f0fef07
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -0,0 +1,198 @@
+package org.apache.rya.indexing.pcj.fluo.app.observers;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
+ * Construct Query {@link RyaStatement}s and exports the results using the
+ * {@link IncrementalRyaSubGraphExporter}s that are registered with this
+ * Observer.
+ *
+ */
+public class ConstructQueryResultObserver extends AbstractObserver {
+
+ private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
+ private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
+ private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
+
+ /**
+ * We expect to see the same expressions a lot, so we cache the simplified
+ * forms.
+ */
+ private final Map<String, String> simplifiedVisibilities = new HashMap<>();
+
+ /**
+ * Builders for each type of result exporter we support.
+ */
+ private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> factories = ImmutableSet
+ .<IncrementalRyaSubGraphExporterFactory> builder().add(new KafkaRyaSubGraphExporterFactory()).build();
+
+ /**
+ * The exporters that are configured.
+ */
+ private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null;
+
+ /**
+ * Before running, determine which exporters are configured and set them up.
+ */
+ @Override
+ public void init(final Context context) {
+ final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> exportersBuilder = ImmutableSet.builder();
+
+ for (final IncrementalRyaSubGraphExporterFactory builder : factories) {
+ try {
+ log.debug("ConstructQueryResultObserver.init(): for each exportersBuilder=" + builder);
+
+ final Optional<IncrementalRyaSubGraphExporter> exporter = builder.build(context);
+ if (exporter.isPresent()) {
+ exportersBuilder.add(exporter.get());
+ }
+ } catch (final IncrementalExporterFactoryException e) {
+ log.error("Could not initialize a result exporter.", e);
+ }
+ }
+
+ exporters = exportersBuilder.build();
+ }
+
+ @Override
+ public ObservedColumn getObservedColumn() {
+ return new ObservedColumn(FluoQueryColumns.CONSTRUCT_STATEMENTS, NotificationType.STRONG);
+ }
+
+ @Override
+ public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+ Bytes bytes = tx.get(row, col);
+ RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray());
+ Set<RyaStatement> statements = subgraph.getStatements();
+ if (statements.size() > 0) {
+ byte[] visibility = statements.iterator().next().getColumnVisibility();
+ visibility = simplifyVisibilities(visibility);
+ for(RyaStatement statement: statements) {
+ statement.setColumnVisibility(visibility);
+ }
+ subgraph.setStatements(statements);
+
+ for (IncrementalRyaSubGraphExporter exporter : exporters) {
+ exporter.export(row.toString(), subgraph);
+ }
+ }
+ //add generated triples back into Fluo for chaining queries together
+ insertTriples(tx, subgraph.getStatements());
+ }
+
+ @Override
+ public void close() {
+ if(exporters != null) {
+ for(final IncrementalRyaSubGraphExporter exporter : exporters) {
+ try {
+ exporter.close();
+ } catch(final Exception e) {
+ log.warn("Problem encountered while closing one of the exporters.", e);
+ }
+ }
+ }
+ }
+
+ private byte[] simplifyVisibilities(byte[] visibilityBytes) throws UnsupportedEncodingException {
+ // Simplify the result's visibilities and cache new simplified
+ // visibilities
+ String visibility = new String(visibilityBytes, "UTF-8");
+ if (!simplifiedVisibilities.containsKey(visibility)) {
+ String simplified = VisibilitySimplifier.simplify(visibility);
+ simplifiedVisibilities.put(visibility, simplified);
+ }
+ return simplifiedVisibilities.get(visibility).getBytes("UTF-8");
+ }
+
+ private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
+
+ for (final RyaStatement triple : triples) {
+ Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
+ try {
+ tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+ } catch (final TripleRowResolverException e) {
+ log.error("Could not convert a Triple into the SPO format: " + triple);
+ }
+ }
+ }
+
+
+ /**
+ * Converts a triple into a byte[] holding the Rya SPO representation of it.
+ *
+ * @param triple - The triple to convert. (not null)
+ * @return The Rya SPO representation of the triple.
+ * @throws TripleRowResolverException The triple could not be converted.
+ */
+ public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+ checkNotNull(triple);
+ final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
+ final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
+ return spoRow.getRow();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index 28c92af..b675ba7 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -30,12 +30,12 @@ import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.utils.VisibilitySimplifier;
import org.apache.rya.indexing.pcj.fluo.app.VisibilityBindingSetSerDe;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaResultExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -58,16 +58,16 @@ public class QueryResultObserver extends AbstractObserver {
/**
* Builders for each type of result exporter we support.
*/
- private static final ImmutableSet<IncrementalResultExporterFactory> factories =
- ImmutableSet.<IncrementalResultExporterFactory>builder()
- .add(new RyaResultExporterFactory())
- .add(new KafkaResultExporterFactory())
+ private static final ImmutableSet<IncrementalBindingSetExporterFactory> factories =
+ ImmutableSet.<IncrementalBindingSetExporterFactory>builder()
+ .add(new RyaBindingSetExporterFactory())
+ .add(new KafkaBindingSetExporterFactory())
.build();
/**
* The exporters that are configured.
*/
- private ImmutableSet<IncrementalResultExporter> exporters = null;
+ private ImmutableSet<IncrementalBindingSetExporter> exporters = null;
@Override
public ObservedColumn getObservedColumn() {
@@ -79,13 +79,13 @@ public class QueryResultObserver extends AbstractObserver {
*/
@Override
public void init(final Context context) {
- final ImmutableSet.Builder<IncrementalResultExporter> exportersBuilder = ImmutableSet.builder();
-
- for(final IncrementalResultExporterFactory builder : factories) {
- log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+ final ImmutableSet.Builder<IncrementalBindingSetExporter> exportersBuilder = ImmutableSet.builder();
+ for(final IncrementalBindingSetExporterFactory builder : factories) {
try {
- final Optional<IncrementalResultExporter> exporter = builder.build(context);
+ log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+
+ final Optional<IncrementalBindingSetExporter> exporter = builder.build(context);
if(exporter.isPresent()) {
exportersBuilder.add(exporter.get());
}
@@ -117,7 +117,7 @@ public class QueryResultObserver extends AbstractObserver {
result.setVisibility( simplifiedVisibilities.get(visibility) );
// Export the result using each of the provided exporters.
- for(final IncrementalResultExporter exporter : exporters) {
+ for(final IncrementalBindingSetExporter exporter : exporters) {
try {
exporter.export(tx, queryId, result);
} catch (final ResultExportException e) {
@@ -129,7 +129,7 @@ public class QueryResultObserver extends AbstractObserver {
@Override
public void close() {
if(exporters != null) {
- for(final IncrementalResultExporter exporter : exporters) {
+ for(final IncrementalBindingSetExporter exporter : exporters) {
try {
exporter.close();
} catch(final Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
new file mode 100644
index 0000000..e836c5d
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/ConstructQueryMetadata.java
@@ -0,0 +1,192 @@
+package org.apache.rya.indexing.pcj.fluo.app.query;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Metadata object used to store metadata for Construct Query Nodes found in
+ * SPARQL queries.
+ *
+ */
+public class ConstructQueryMetadata extends CommonNodeMetadata {
+
+ private String childNodeId;
+ private ConstructGraph graph;
+ private String sparql;
+
+ /**
+ * Creates ConstructQueryMetadata object from the provided metadata arguments.
+ * @param nodeId - id for the ConstructQueryNode
+ * @param childNodeId - id for the child of the ConstructQueryNode
+ * @param graph - {@link ConstructGraph} used to project {@link BindingSet}s onto sets of statement representing construct graph
+ * @param sparql - SPARQL query containing construct graph
+ */
+ public ConstructQueryMetadata(String nodeId, String childNodeId, ConstructGraph graph, String sparql) {
+ super(nodeId, new VariableOrder("subject", "predicate", "object"));
+ Preconditions.checkNotNull(childNodeId);
+ Preconditions.checkNotNull(graph);
+ Preconditions.checkNotNull(sparql);
+ this.childNodeId = childNodeId;
+ this.graph = graph;
+ this.sparql = sparql;
+ }
+
+ /**
+ * @return sparql query string representing this construct query
+ */
+ public String getSparql() {
+ return sparql;
+ }
+
+ /**
+ * @return The node whose results are projected onto the given
+ * {@link ConstructGraph}.
+ */
+ public String getChildNodeId() {
+ return childNodeId;
+ }
+
+ /**
+ * @return The ConstructGraph used to form statement {@link BindingSet}s for
+ * this Construct Query
+ */
+ public ConstructGraph getConstructGraph() {
+ return graph;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(super.getNodeId(), super.getVariableOrder(), childNodeId, graph, sparql);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o instanceof ConstructQueryMetadata) {
+ ConstructQueryMetadata queryMetadata = (ConstructQueryMetadata) o;
+ if (super.equals(queryMetadata)) {
+ return new EqualsBuilder().append(childNodeId, queryMetadata.childNodeId).append(graph, queryMetadata.graph)
+ .append(sparql, queryMetadata.sparql).isEquals();
+ }
+ return false;
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Construct Query Metadata {\n").append(" Node ID: " + super.getNodeId() + "\n")
+ .append(" SPARQL QUERY: " + sparql + "\n").append(" Variable Order: " + super.getVariableOrder() + "\n")
+ .append(" Child Node ID: " + childNodeId + "\n").append(" Construct Graph: " + graph.getProjections() + "\n")
+ .append("}").toString();
+ }
+
+ /**
+ * Creates a new {@link Builder} for this class.
+ *
+ * @return A new {@link Builder} for this class.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builds instances of {@link QueryMetadata}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ public static final class Builder {
+
+
+ private String nodeId;
+ private ConstructGraph graph;
+ private String childNodeId;
+ private String sparql;
+
+ /**
+ * Set the node Id that identifies this Construct Query Node
+ *
+ * @param nodeId
+ * id for this node
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ return this;
+ }
+
+ /**
+ * Set the SPARQL String representing this construct query
+ * @param SPARQL string representing this construct query
+ */
+ public Builder setSparql(String sparql) {
+ this.sparql = sparql;
+ return this;
+ }
+
+ /**
+ * Set the ConstructGraph used to form statement {@link BindingSet}s for
+ * this Construct Query
+ *
+ * @param varOrder
+ * - ConstructGraph to project {@link BindingSet}s onto RDF
+ * statements
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setConstructGraph(ConstructGraph graph) {
+ this.graph = graph;
+ return this;
+ }
+
+ /**
+ * Set the node whose results are projected onto the given
+ * {@link ConstructGraph}.
+ *
+ * @param childNodeId
+ * - The node whose results are projected onto the given
+ * {@link ConstructGraph}.
+ * @return This builder so that method invocations may be chained.
+ */
+ public Builder setChildNodeId(String childNodeId) {
+ this.childNodeId = childNodeId;
+ return this;
+ }
+
+ /**
+ * @return An instance of {@link ConstructQueryMetadata} build using
+ * this builder's values.
+ */
+ public ConstructQueryMetadata build() {
+ return new ConstructQueryMetadata(nodeId, childNodeId, graph, sparql);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/60090ad5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index 3230a5d..a701052 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -29,6 +29,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
@@ -43,11 +44,14 @@ import net.jcip.annotations.Immutable;
@DefaultAnnotation(NonNull.class)
public class FluoQuery {
- private final QueryMetadata queryMetadata;
+ private final Optional<QueryMetadata> queryMetadata;
+ private final Optional<ConstructQueryMetadata> constructMetadata;
private final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata;
private final ImmutableMap<String, FilterMetadata> filterMetadata;
private final ImmutableMap<String, JoinMetadata> joinMetadata;
private final ImmutableMap<String, AggregationMetadata> aggregationMetadata;
+ private final QueryType type;
+ public static enum QueryType {Projection, Construct};
/**
* Constructs an instance of {@link FluoQuery}. Private because applications
@@ -67,21 +71,65 @@ public class FluoQuery {
final QueryMetadata queryMetadata,
final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
final ImmutableMap<String, FilterMetadata> filterMetadata,
+ final ImmutableMap<String, JoinMetadata> joinMetadata,
+ final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
+ this.aggregationMetadata = requireNonNull(aggregationMetadata);
+ this.queryMetadata = Optional.of(requireNonNull(queryMetadata));
+ this.constructMetadata = Optional.absent();
+ this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
+ this.filterMetadata = requireNonNull(filterMetadata);
+ this.joinMetadata = requireNonNull(joinMetadata);
+ this.type = QueryType.Projection;
+ }
+
+
+ /**
+ * Constructs an instance of {@link FluoQuery}. Private because applications
+ * must use {@link Builder} instead.
+ *
+ * @param constructMetadata - The root node of a query that is updated in Fluo. (not null)
+ * @param statementPatternMetadata - A map from Node ID to Statement Pattern metadata as
+ * it is represented within the Fluo app. (not null)
+ * @param filterMetadata A map from Node ID to Filter metadata as it is represented
+ * within the Fluo app. (not null)
+ * @param joinMetadata A map from Node ID to Join metadata as it is represented
+ * within the Fluo app. (not null)
+ * @param aggregationMetadata - A map from Node ID to Aggregation metadata as it is
+ * represented within the Fluo app. (not null)
+ */
+ private FluoQuery(
+ final ConstructQueryMetadata constructMetadata,
+ final ImmutableMap<String, StatementPatternMetadata> statementPatternMetadata,
+ final ImmutableMap<String, FilterMetadata> filterMetadata,
final ImmutableMap<String, JoinMetadata> joinMetadata,
final ImmutableMap<String, AggregationMetadata> aggregationMetadata) {
- this.queryMetadata = requireNonNull(queryMetadata);
+ this.constructMetadata = Optional.of(requireNonNull(constructMetadata));
+ this.queryMetadata = Optional.absent();
this.statementPatternMetadata = requireNonNull(statementPatternMetadata);
this.filterMetadata = requireNonNull(filterMetadata);
this.joinMetadata = requireNonNull(joinMetadata);
- this.aggregationMetadata = requireNonNull(aggregationMetadata);
+ this.aggregationMetadata = aggregationMetadata;
+ this.type = QueryType.Construct;
+ }
+
+ /**
+ * Returns the {@link QueryType} of this query
+ * @return the QueryType of this query (either Construct or Projection}
+ */
+ public QueryType getQueryType() {
+ return type;
}
/**
* @return Metadata about the root node of a query that is updated within the Fluo app.
*/
- public QueryMetadata getQueryMetadata() {
+ public Optional<QueryMetadata> getQueryMetadata() {
return queryMetadata;
}
+
+ public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
+ return constructMetadata;
+ }
/**
* Get a Statement Pattern node's metadata.
@@ -175,6 +223,7 @@ public class FluoQuery {
final FluoQuery fluoQuery = (FluoQuery)o;
return new EqualsBuilder()
.append(queryMetadata, fluoQuery.queryMetadata)
+ .append(constructMetadata, fluoQuery.constructMetadata)
.append(statementPatternMetadata, fluoQuery.statementPatternMetadata)
.append(filterMetadata, fluoQuery.filterMetadata)
.append(joinMetadata, fluoQuery.joinMetadata)
@@ -189,8 +238,13 @@ public class FluoQuery {
public String toString() {
final StringBuilder builder = new StringBuilder();
- if(queryMetadata != null) {
- builder.append( queryMetadata.toString() );
+ if(queryMetadata.isPresent()) {
+ builder.append( queryMetadata.get().toString() );
+ builder.append("\n");
+ }
+
+ if(constructMetadata.isPresent()) {
+ builder.append( constructMetadata.get().toString() );
builder.append("\n");
}
@@ -231,6 +285,7 @@ public class FluoQuery {
public static final class Builder {
private QueryMetadata.Builder queryBuilder = null;
+ private ConstructQueryMetadata.Builder constructBuilder = null;
private final Map<String, StatementPatternMetadata.Builder> spBuilders = new HashMap<>();
private final Map<String, FilterMetadata.Builder> filterBuilders = new HashMap<>();
private final Map<String, JoinMetadata.Builder> joinBuilders = new HashMap<>();
@@ -239,11 +294,11 @@ public class FluoQuery {
/**
* Sets the {@link QueryMetadata.Builder} that is used by this builder.
*
- * @param queryMetadata - The builder representing the query's results.
+ * @param queryBuilder - The builder representing the query's results.
* @return This builder so that method invocation may be chained.
*/
- public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryMetadata) {
- this.queryBuilder = queryMetadata;
+ public Builder setQueryMetadata(@Nullable final QueryMetadata.Builder queryBuilder) {
+ this.queryBuilder = queryBuilder;
return this;
}
@@ -253,6 +308,26 @@ public class FluoQuery {
public Optional<QueryMetadata.Builder> getQueryBuilder() {
return Optional.fromNullable( queryBuilder );
}
+
+ /**
+ * Sets the {@link ConstructQueryMetadata.Builder} that is used by this builder.
+ *
+ * @param constructBuilder
+ * - The builder representing the query's results.
+ * @return This builder so that method invocation may be chained.
+ */
+ public Builder setConstructQueryMetadata(@Nullable final ConstructQueryMetadata.Builder constructBuilder) {
+ this.constructBuilder = constructBuilder;
+ return this;
+ }
+
+ /**
+ * @return The Construct Query metadata builder if one has been set.
+ */
+ public Optional<ConstructQueryMetadata.Builder> getConstructQueryBuilder() {
+ return Optional.fromNullable( constructBuilder );
+ }
+
/**
* Adds a new {@link StatementPatternMetadata.Builder} to this builder.
@@ -345,12 +420,14 @@ public class FluoQuery {
requireNonNull(nodeId);
return Optional.fromNullable( joinBuilders.get(nodeId) );
}
+
/**
* @return Creates a {@link FluoQuery} using the values that have been supplied to this builder.
*/
public FluoQuery build() {
- final QueryMetadata queryMetadata = queryBuilder.build();
+ Preconditions.checkArgument(
+ (queryBuilder != null && constructBuilder == null) || (queryBuilder == null && constructBuilder != null));
final ImmutableMap.Builder<String, StatementPatternMetadata> spMetadata = ImmutableMap.builder();
for(final Entry<String, StatementPatternMetadata.Builder> entry : spBuilders.entrySet()) {
@@ -372,7 +449,14 @@ public class FluoQuery {
aggregateMetadata.put(entry.getKey(), entry.getValue().build());
}
- return new FluoQuery(queryMetadata, spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ if(queryBuilder != null) {
+ return new FluoQuery(queryBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ }
+ //constructBuilder non-null in this case, but no need to check
+ else {
+ return new FluoQuery(constructBuilder.build(), spMetadata.build(), filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+ }
+
}
}
}
\ No newline at end of file