You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by ca...@apache.org on 2017/08/25 19:35:35 UTC
[4/5] incubator-rya git commit: RYA-246-Query-Export-Strategy. Closes
#213.
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
index 5507037..b796a6f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
@@ -23,7 +23,8 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import com.google.common.base.Optional;
@@ -44,13 +45,13 @@ import com.google.common.base.Optional;
*
* @see ProducerConfig
*/
-public class KafkaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+public class KafkaBindingSetExporterFactory implements IncrementalResultExporterFactory {
private static final Logger log = Logger.getLogger(KafkaBindingSetExporterFactory.class);
@Override
- public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
- final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
- log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
- if (exportParams.isExportToKafka()) {
+ public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ final KafkaBindingSetExporterParameters exportParams = new KafkaBindingSetExporterParameters(context.getObserverConfiguration().toMap());
+ log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.getUseKafkaBindingSetExporter());
+ if (exportParams.getUseKafkaBindingSetExporter()) {
// Setup Kafka connection
KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
// Create the exporter
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
new file mode 100644
index 0000000..4550a50
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Preconditions;
+
+
+public class KafkaBindingSetExporterParameters extends KafkaExportParameterBase {
+
+ public static final String CONF_USE_KAFKA_BINDING_SET_EXPORTER = "pcj.fluo.export.kafka.bindingset.enabled";
+ public static final String CONF_KAFKA_BINDING_SET_SERIALIZER = "pcj.fluo.export.kafka.bindingset.serializer";
+
+ public KafkaBindingSetExporterParameters(final Map<String, String> params) {
+ super(params);
+ }
+
+ /**
+ * Instructs the Fluo application to use the Kafka Binding Set Exporter
+ * and sets the appropriate Key/Value Serializer parameters for writing BindingSets to Kafka.
+ * @param useExporter
+ * - {@code True} if the Fluo application should use the
+ * {@link KafkaBindingSetExporter}; otherwise {@code false}.
+ */
+ public void setUseKafkaBindingSetExporter(final boolean useExporter) {
+ setBoolean(params, CONF_USE_KAFKA_BINDING_SET_EXPORTER, useExporter);
+ }
+
+ /**
+ * @return {@code True} if the Fluo application should use the {@link KafkaBindingSetExporter}; otherwise
+ * {@code false}. Defaults to {@code false} if no value is present.
+ */
+ public boolean getUseKafkaBindingSetExporter() {
+ return getBoolean(params, CONF_USE_KAFKA_BINDING_SET_EXPORTER, false);
+ }
+
+ /**
+ *
+ * @param serializer - Used for Serializing BindingSets pushed to Kafka
+ */
+ public void setKafkaBindingSetSerializer(String serializer) {
+ params.put(CONF_KAFKA_BINDING_SET_SERIALIZER, Preconditions.checkNotNull(serializer));
+ }
+
+ /**
+ * @return - Serializer used for Serializing BindingSets to Kafka
+ */
+ public String getKafkaBindingSetSerializer() {
+ return params.getOrDefault(CONF_KAFKA_BINDING_SET_SERIALIZER, KryoVisibilityBindingSetSerializer.class.getName());
+ }
+
+ @Override
+ public Properties listAllConfig() {
+ Properties props = super.listAllConfig();
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaBindingSetSerializer());
+ return props;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
new file mode 100644
index 0000000..aab3929
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.fluo.api.observer.Observer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
+
+import jline.internal.Preconditions;
+
+/**
+ * Provides read/write functions to the parameters map that is passed into an
+ * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
+ * to PCJ exporting to a kafka topic.
+ * Remember: if doesn't count unless it is added to params
+ */
+
+public class KafkaExportParameterBase extends ParametersBase {
+
+ public KafkaExportParameterBase(final Map<String, String> params) {
+ super(params);
+ }
+
+ /**
+ * Sets the bootstrap servers for reading from and writing to Kafka
+ * @param bootstrapServers - connect string for Kafka brokers
+ */
+ public void setKafkaBootStrapServers(String bootstrapServers) {
+ params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Preconditions.checkNotNull(bootstrapServers));
+ }
+
+ /**
+ * @return Connect string for Kafka servers
+ */
+ public Optional<String> getKafkaBootStrapServers() {
+ return Optional.ofNullable(params.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+ }
+
+ /**
+ * Add the properties to the params, NOT keeping them separate from the other params.
+ * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
+ *
+ * @param producerConfig
+ */
+ public void addAllProducerConfig(final Properties producerConfig) {
+ for (Object key : producerConfig.keySet().toArray()) {
+ Object value = producerConfig.getProperty(key.toString());
+ this.params.put(key.toString(), value.toString());
+ }
+ }
+
+ /**
+ * Collect all the properties
+ *
+ * @return all the params (not just kafka producer Configuration) as a {@link Properties}
+ */
+ public Properties listAllConfig() {
+ Properties props = new Properties();
+ for (Object key : params.keySet().toArray()) {
+ Object value = params.get(key.toString());
+ props.put(key.toString(), value.toString());
+ }
+ return props;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
deleted file mode 100644
index 347a2e2..0000000
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameters.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.fluo.api.observer.Observer;
-import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
-
-/**
- * Provides read/write functions to the parameters map that is passed into an
- * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
- * to PCJ exporting to a kafka topic.
- * Remember: if doesn't count unless it is added to params
- */
-
-public class KafkaExportParameters extends ParametersBase {
-
- public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled";
-
- public KafkaExportParameters(final Map<String, String> params) {
- super(params);
- }
-
- /**
- * @param isExportToKafka
- * - {@code True} if the Fluo application should export
- * to Kafka; otherwise {@code false}.
- */
- public void setExportToKafka(final boolean isExportToKafka) {
- setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
- }
-
- /**
- * @return {@code True} if the Fluo application should export to Kafka; otherwise
- * {@code false}. Defaults to {@code false} if no value is present.
- */
- public boolean isExportToKafka() {
- return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
- }
-
- /**
- * Add the properties to the params, NOT keeping them separate from the other params.
- * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
- *
- * @param producerConfig
- */
- public void addAllProducerConfig(final Properties producerConfig) {
- for (Object key : producerConfig.keySet().toArray()) {
- Object value = producerConfig.getProperty(key.toString());
- this.params.put(key.toString(), value.toString());
- }
- }
-
- /**
- * Collect all the properties
- *
- * @return all the params (not just kafka producer Configuration) as a {@link Properties}
- */
- public Properties listAllConfig() {
- Properties props = new Properties();
- for (Object key : params.keySet().toArray()) {
- Object value = params.get(key.toString());
- props.put(key.toString(), value.toString());
- }
- return props;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
index fa27b46..da26329 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
*/
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -26,8 +27,13 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.log4j.Logger;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.api.domain.RyaSubGraph;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+
+import com.google.common.collect.Sets;
+
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
/**
@@ -76,4 +82,14 @@ public class KafkaRyaSubGraphExporter implements IncrementalRyaSubGraphExporter
producer.close(5, TimeUnit.SECONDS);
}
+ @Override
+ public Set<QueryType> getQueryTypes() {
+ return Sets.newHashSet(QueryType.CONSTRUCT);
+ }
+
+ @Override
+ public ExportStrategy getExportStrategy() {
+ return ExportStrategy.KAFKA;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
index 2c1e4c0..60e9294 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
@@ -21,10 +21,9 @@ import org.apache.fluo.api.observer.Observer.Context;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
import com.google.common.base.Optional;
@@ -33,9 +32,11 @@ import com.google.common.base.Optional;
* exporting {@link RyaSubGraph}s from the Rya Fluo application to Kafka.
*
*/
-public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphExporterFactory {
+public class KafkaRyaSubGraphExporterFactory implements IncrementalResultExporterFactory {
private static final Logger log = Logger.getLogger(KafkaRyaSubGraphExporterFactory.class);
+ public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = "pcj.fluo.export.kafka.subgraph.enabled";
+ public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = "pcj.fluo.export.kafka.subgraph.serializer";
/**
* Builds a {@link KafkaRyaSubGraphExporter}.
@@ -45,10 +46,10 @@ public class KafkaRyaSubGraphExporterFactory implements IncrementalRyaSubGraphEx
* @throws ConfigurationException
*/
@Override
- public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
- final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
- log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
- if (exportParams.isExportToKafka()) {
+ public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ final KafkaSubGraphExporterParameters exportParams = new KafkaSubGraphExporterParameters(context.getObserverConfiguration().toMap());
+ log.debug("KafkaRyaSubGraphExporterFactory.build(): params.isExportToKafka()=" + exportParams.getUseKafkaSubgraphExporter());
+ if (exportParams.getUseKafkaSubgraphExporter()) {
// Setup Kafka connection
KafkaProducer<String, RyaSubGraph> producer = new KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
// Create the exporter
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
new file mode 100644
index 0000000..1472fdd
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaSubGraphExporterParameters.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import com.google.common.base.Preconditions;
+
+
+public class KafkaSubGraphExporterParameters extends KafkaExportParameterBase {
+
+ public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = "pcj.fluo.export.kafka.subgraph.enabled";
+ public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = "pcj.fluo.export.kafka.subgraph.serializer";
+
+ public KafkaSubGraphExporterParameters(final Map<String, String> params) {
+ super(params);
+ }
+
+ /**
+ * Instructs the Fluo application to use the Kafka BindingSet Exporter
+ * and sets the appropriate Key/Value Serializer parameters for writing RyaSubGraphs to Kafka.
+ * @param useExporter
+ * - {@code True} if the Fluo application should use the
+ * {@link KafkaRyaSubGraphExporter}; otherwise {@code false}.
+ */
+ public void setUseKafkaSubgraphExporter(final boolean useExporter) {
+ setBoolean(params, CONF_USE_KAFKA_SUBGRAPH_EXPORTER, useExporter);
+ }
+
+ /**
+ * @return {@code True} if the Fluo application should use the {@link KafkaRyaSubGraphExporter}; otherwise
+ * {@code false}. Defaults to {@code false} if no value is present.
+ */
+ public boolean getUseKafkaSubgraphExporter() {
+ return getBoolean(params, CONF_USE_KAFKA_SUBGRAPH_EXPORTER, false);
+ }
+
+ /**
+ *
+ * @param serializer - Used for Serializing RyaSubGraphs pushed to Kafka
+ */
+ public void setKafkaSubGraphSerializer(String serializer) {
+ params.put(CONF_KAFKA_SUBGRAPH_SERIALIZER, Preconditions.checkNotNull(serializer));
+ }
+
+ /**
+ * @return - Serializer used for Serializing RyaSubGraphs to Kafka
+ */
+ public String getKafkaSubGraphSerializer() {
+ return params.getOrDefault(CONF_KAFKA_SUBGRAPH_SERIALIZER, RyaSubGraphKafkaSerDe.class.getName());
+ }
+
+ @Override
+ public Properties listAllConfig() {
+ Properties props = super.listAllConfig();
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getKafkaSubGraphSerializer());
+ return props;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
new file mode 100644
index 0000000..604462b
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.Set;
+
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.collect.Sets;
+
+public class PeriodicBindingSetExporter implements IncrementalBindingSetExporter {
+
+ private PeriodicQueryResultStorage periodicStorage;
+
+ /**
+ * Constructs an instance of {@link PeriodicBindingSetExporter}.
+ *
+ * @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
+ */
+ public PeriodicBindingSetExporter(PeriodicQueryResultStorage periodicStorage) {
+ this.periodicStorage = checkNotNull(periodicStorage);
+ }
+
+ @Override
+ public Set<QueryType> getQueryTypes() {
+ return Sets.newHashSet(QueryType.PERIODIC);
+ }
+
+ @Override
+ public ExportStrategy getExportStrategy() {
+ return ExportStrategy.RYA;
+ }
+
+ @Override
+ public void close() throws Exception {
+ }
+
+ @Override
+ public void export(String queryId, VisibilityBindingSet result) throws ResultExportException {
+ try {
+ periodicStorage.addPeriodicQueryResults(queryId, Collections.singleton(result));
+ } catch (PeriodicQueryStorageException e) {
+ throw new ResultExportException("Could not successfully export the BindingSet: " + result, e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
new file mode 100644
index 0000000..0a0b767
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporterFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+
+import com.google.common.base.Optional;
+
+public class PeriodicBindingSetExporterFactory implements IncrementalResultExporterFactory {
+
+ @Override
+ public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ checkNotNull(context);
+
+ // Wrap the context's parameters for parsing.
+ final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
+
+ if(params.getUsePeriodicBindingSetExporter()) {
+ // Setup Zookeeper connection info.
+ final String accumuloInstance = params.getAccumuloInstanceName().get();
+ final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ",");
+ final Instance inst = new ZooKeeperInstance(accumuloInstance, zookeeperServers);
+
+ try {
+ // Setup Accumulo connection info.
+ final String exporterUsername = params.getExporterUsername().get();
+ final String exporterPassword = params.getExporterPassword().get();
+ final Connector accumuloConn = inst.getConnector(exporterUsername, new PasswordToken(exporterPassword));
+
+ // Setup Rya PCJ Storage.
+ final String ryaInstanceName = params.getRyaInstanceName().get();
+ final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName);
+
+ // Make the exporter.
+ final IncrementalBindingSetExporter exporter = new PeriodicBindingSetExporter(periodicStorage);
+ return Optional.of(exporter);
+
+ } catch (final AccumuloException | AccumuloSecurityException e) {
+ throw new IncrementalExporterFactoryException("Could not initialize the Accumulo connector using the provided configuration.", e);
+ }
+ } else {
+ return Optional.absent();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
index 54c39b7..8a9dbe4 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporter.java
@@ -22,55 +22,42 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import java.util.Collections;
+import java.util.Set;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import com.google.common.collect.Sets;
+
/**
* Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
*/
public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
private final PrecomputedJoinStorage pcjStorage;
- private final PeriodicQueryResultStorage periodicStorage;
/**
* Constructs an instance of {@link RyaBindingSetExporter}.
*
* @param pcjStorage - The PCJ storage the new results will be exported to. (not null)
*/
- public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage, PeriodicQueryResultStorage periodicStorage) {
+ public RyaBindingSetExporter(final PrecomputedJoinStorage pcjStorage) {
this.pcjStorage = checkNotNull(pcjStorage);
- this.periodicStorage = checkNotNull(periodicStorage);
}
@Override
- public void export(
- final TransactionBase fluoTx,
- final String queryId,
- final VisibilityBindingSet result) throws ResultExportException {
- requireNonNull(fluoTx);
+ public void export(final String queryId, final VisibilityBindingSet result) throws ResultExportException {
requireNonNull(queryId);
requireNonNull(result);
- // Look up the ID the PCJ represents within the PCJ Storage.
- final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
-
try {
- if (result.hasBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID)) {
- periodicStorage.addPeriodicQueryResults(pcjId, Collections.singleton(result));
- } else {
- pcjStorage.addResults(pcjId, Collections.singleton(result));
- }
- } catch (final PCJStorageException | PeriodicQueryStorageException e) {
- throw new ResultExportException("A result could not be exported to Rya.", e);
+ pcjStorage.addResults(queryId, Collections.singleton(result));
+ } catch (PCJStorageException e) {
+ throw new ResultExportException("Unable to successfully export the result: " + result, e);
}
}
@@ -78,4 +65,14 @@ public class RyaBindingSetExporter implements IncrementalBindingSetExporter {
public void close() throws Exception {
pcjStorage.close();
}
+
+ @Override
+ public Set<QueryType> getQueryTypes() {
+ return Sets.newHashSet(QueryType.PROJECTION);
+ }
+
+ @Override
+ public ExportStrategy getExportStrategy() {
+ return ExportStrategy.RYA;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
index 82ce9c6..a87243e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -26,8 +26,10 @@ import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.fluo.api.observer.Observer.Context;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
@@ -35,21 +37,19 @@ import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultS
import com.google.common.base.Optional;
-import org.apache.fluo.api.observer.Observer.Context;
-
/**
* Creates instances of {@link RyaBindingSetExporter}.
*/
-public class RyaBindingSetExporterFactory implements IncrementalBindingSetExporterFactory {
+public class RyaBindingSetExporterFactory implements IncrementalResultExporterFactory {
@Override
- public Optional<IncrementalBindingSetExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ public Optional<IncrementalResultExporter> build(final Context context) throws IncrementalExporterFactoryException, ConfigurationException {
checkNotNull(context);
// Wrap the context's parameters for parsing.
final RyaExportParameters params = new RyaExportParameters( context.getObserverConfiguration().toMap() );
- if(params.isExportToRya()) {
+ if(params.getUseRyaBindingSetExporter()) {
// Setup Zookeeper connection info.
final String accumuloInstance = params.getAccumuloInstanceName().get();
final String zookeeperServers = params.getZookeeperServers().get().replaceAll(";", ",");
@@ -64,10 +64,9 @@ public class RyaBindingSetExporterFactory implements IncrementalBindingSetExport
// Setup Rya PCJ Storage.
final String ryaInstanceName = params.getRyaInstanceName().get();
final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, ryaInstanceName);
- final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(accumuloConn, ryaInstanceName);
// Make the exporter.
- final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage, periodicStorage);
+ final IncrementalBindingSetExporter exporter = new RyaBindingSetExporter(pcjStorage);
return Optional.of(exporter);
} catch (final AccumuloException | AccumuloSecurityException e) {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
index a1ba5b8..aa5d3cd 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaExportParameters.java
@@ -38,7 +38,8 @@ import org.apache.fluo.api.observer.Observer;
@DefaultAnnotation(NonNull.class)
public class RyaExportParameters extends ParametersBase {
- public static final String CONF_EXPORT_TO_RYA = "pcj.fluo.export.rya.enabled";
+ public static final String CONF_USE_RYA_BINDING_SET_EXPORTER = "pcj.fluo.export.rya.bindingset.enabled";
+ public static final String CONF_USE_PERIODIC_BINDING_SET_EXPORTER = "pcj.fluo.export.periodic.bindingset.enabled";
public static final String CONF_ACCUMULO_INSTANCE_NAME = "pcj.fluo.export.rya.accumuloInstanceName";
public static final String CONF_ZOOKEEPER_SERVERS = "pcj.fluo.export.rya.zookeeperServers";
public static final String CONF_EXPORTER_USERNAME = "pcj.fluo.export.rya.exporterUsername";
@@ -57,19 +58,35 @@ public class RyaExportParameters extends ParametersBase {
}
/**
- * @param isExportToRya - {@code True} if the Fluo application should export
- * to Rya; otherwise {@code false}.
+ * @param useExporter - {@code True} if the Fluo application should use the {@link RyaBindingSetExporter}; otherwise
+ * {@code false}.
*/
- public void setExportToRya(final boolean isExportToRya) {
- setBoolean(params, CONF_EXPORT_TO_RYA, isExportToRya);
+ public void setUseRyaBindingSetExporter(final boolean useExporter) {
+ setBoolean(params, CONF_USE_RYA_BINDING_SET_EXPORTER, useExporter);
}
/**
- * @return {@code True} if the Fluo application should export to Rya; otherwise
+ * @return {@code True} if the Fluo application should use the {@link RyaBindingSetExporter}; otherwise
* {@code false}. Defaults to {@code false} if no value is present.
*/
- public boolean isExportToRya() {
- return getBoolean(params, CONF_EXPORT_TO_RYA, false);
+ public boolean getUseRyaBindingSetExporter() {
+ return getBoolean(params, CONF_USE_RYA_BINDING_SET_EXPORTER, false);
+ }
+
+ /**
+ * @param useExporter - {@code True} if the Fluo application should use the
+ * {@link PeriodicBindingSetExporter}; otherwise {@code false}.
+ */
+ public void setUsePeriodicBindingSetExporter(final boolean useExporter) {
+ setBoolean(params, CONF_USE_PERIODIC_BINDING_SET_EXPORTER, useExporter);
+ }
+
+ /**
+ * @return {@code True} if the Fluo application should use the {@link PeriodicBindingSetExporter}; otherwise
+ * {@code false}. Defaults to {@code false} if no value is present.
+ */
+ public boolean getUsePeriodicBindingSetExporter() {
+ return getBoolean(params, CONF_USE_PERIODIC_BINDING_SET_EXPORTER, false);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
new file mode 100644
index 0000000..6a99a7e
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExportParameters.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.fluo.api.config.FluoConfiguration;
+
+import com.google.common.base.Preconditions;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * This class manages the parameters used to construct the RyaSubGraphExporter.
+ *
+ */
+public class RyaSubGraphExportParameters extends RyaExportParameters {
+
+ public static final String CONF_FLUO_INSTANCE = "pcj.fluo.export.rya.fluo.instance";
+ public static final String CONF_FLUO_INSTANCE_ZOOKEEPERS = "pcj.fluo.export.rya.fluo.instance.zookeepers";
+ public static final String CONF_FLUO_TABLE_NAME = "pcj.fluo.export.rya.fluo.table.name";
+ public static final String CONF_USE_RYA_SUBGRAPH_EXPORTER = "pcj.fluo.export.rya.subgraph.enabled";
+
+
+ public RyaSubGraphExportParameters(Map<String, String> params) {
+ super(params);
+ }
+
+ /**
+ * @param useExporter - indicates whether to use the {@link RyaSubGraphExporter}
+ */
+ public void setUseRyaSubGraphExporter(boolean useExporter) {
+ setBoolean(params, CONF_USE_RYA_SUBGRAPH_EXPORTER, useExporter);
+ }
+
+ /**
+ * @return boolean indicating whether to use the {@link RyaSubGraphExporter}
+ */
+ public boolean getUseRyaSubGraphExporter() {
+ return getBoolean(params, CONF_USE_RYA_SUBGRAPH_EXPORTER, false);
+ }
+
+ /**
+ * @param fluoInstance - the Accumulo instance that Fluo is running on
+ */
+ public void setFluoInstanceName(String fluoInstance) {
+ params.put(CONF_FLUO_INSTANCE, Preconditions.checkNotNull(fluoInstance));
+ }
+
+ /**
+ * @return the Accumulo instance that Fluo is running on
+ */
+ public Optional<String> getFluoInstanceName() {
+ return Optional.ofNullable(params.get(CONF_FLUO_INSTANCE));
+ }
+
+ /**
+ * @param fluoTable - the name of the Accumulo Fluo table
+ */
+ public void setFluoTable(@Nullable String fluoTable) {
+ params.put(CONF_FLUO_TABLE_NAME, fluoTable);
+ }
+
+ /**
+ * @return the name of the Accumulo Fluo table
+ */
+ public Optional<String> getFluoTable() {
+ return Optional.ofNullable(params.get(CONF_FLUO_TABLE_NAME));
+ }
+
+ /**
+ * @param zookeepers - the zookeepers for the Fluo instance
+ */
+ public void setFluoZookeepers(@Nullable String zookeepers) {
+ params.put(CONF_FLUO_INSTANCE_ZOOKEEPERS, zookeepers);
+ }
+
+ /**
+ * @return - the zookeepers for the Fluo instance
+ */
+ public Optional<String> getFLuoZookeepers() {
+ return Optional.ofNullable(params.get(CONF_FLUO_INSTANCE_ZOOKEEPERS));
+ }
+
+ /**
+ * Uses underlying parameter map to build a FluoConfiguration object
+ * @return - FluoConfiguration for creating a FluoClient
+ */
+ public FluoConfiguration getFluoConfiguration() {
+ final FluoConfiguration config = new FluoConfiguration();
+ config.setMiniStartAccumulo(false);
+ config.setAccumuloInstance(params.get(CONF_ACCUMULO_INSTANCE_NAME));
+ config.setAccumuloUser(params.get(CONF_EXPORTER_USERNAME));
+ config.setAccumuloPassword(params.get(CONF_EXPORTER_PASSWORD));
+ config.setInstanceZookeepers(params.get(CONF_FLUO_INSTANCE_ZOOKEEPERS));
+ config.setAccumuloZookeepers(params.get(CONF_ZOOKEEPER_SERVERS));
+
+ config.setApplicationName(params.get(CONF_FLUO_APP_NAME));
+ config.setAccumuloTable(params.get(CONF_FLUO_TABLE_NAME));
+ return config;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
new file mode 100644
index 0000000..e33ea97
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporter.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
+import org.apache.rya.api.client.CreatePCJ.QueryType;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.RyaSubGraph;
+import org.apache.rya.api.resolver.triple.TripleRow;
+import org.apache.rya.api.resolver.triple.TripleRowResolverException;
+import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * This exporter is used to import {@link RyaSubGraph}s back into Fluo. By ingesting
+ * RyaSubGraphs back into Fluo, queries can be chained together.
+ *
+ */
+public class RyaSubGraphExporter implements IncrementalRyaSubGraphExporter {
+
+ private static final Logger log = Logger.getLogger(RyaSubGraphExporter.class);
+ private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
+ private final FluoClient fluo;
+
+ public RyaSubGraphExporter(FluoClient fluo) {
+ this.fluo = Preconditions.checkNotNull(fluo);
+ }
+
+ @Override
+ public Set<QueryType> getQueryTypes() {
+ return Sets.newHashSet(QueryType.CONSTRUCT);
+ }
+
+ @Override
+ public ExportStrategy getExportStrategy() {
+ return ExportStrategy.RYA;
+ }
+
+ @Override
+ public void close() throws Exception {
+ fluo.close();
+ }
+
+ @Override
+ public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException {
+ insertTriples(fluo.newTransaction(), subgraph.getStatements());
+ }
+
+ private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
+ for (final RyaStatement triple : triples) {
+ Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
+ try {
+ tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
+ } catch (final TripleRowResolverException e) {
+ log.error("Could not convert a Triple into the SPO format: " + triple);
+ }
+ }
+ }
+
+ /**
+ * Converts a triple into a byte[] holding the Rya SPO representation of it.
+ *
+ * @param triple - The triple to convert. (not null)
+ * @return The Rya SPO representation of the triple.
+ * @throws TripleRowResolverException The triple could not be converted.
+ */
+ private static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
+ checkNotNull(triple);
+ final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
+ final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
+ return spoRow.getRow();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
new file mode 100644
index 0000000..25f60a5
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaSubGraphExporterFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.FluoFactory;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/**
+ * Factory class for building {@link RyaSubGraphExporter}s.
+ *
+ */
+public class RyaSubGraphExporterFactory implements IncrementalResultExporterFactory {
+
+ @Override
+ public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ Preconditions.checkNotNull(context);
+
+ RyaSubGraphExportParameters params = new RyaSubGraphExportParameters(context.getObserverConfiguration().toMap());
+
+ if (params.getUseRyaSubGraphExporter()) {
+ try {
+ //Get FluoConfiguration from params
+ FluoConfiguration conf = params.getFluoConfiguration();
+ FluoClient fluo = FluoFactory.newClient(conf);
+
+ //Create exporter
+ RyaSubGraphExporter exporter = new RyaSubGraphExporter(fluo);
+ return Optional.of(exporter);
+ } catch (Exception e) {
+ throw new IncrementalExporterFactoryException("Could not initialize the RyaSubGraphExporter", e);
+ }
+ }
+ return Optional.absent();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
index 1cb1594..6147fa8 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java
@@ -28,7 +28,6 @@ import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSeria
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.openrdf.query.BindingSet;
@@ -45,8 +44,6 @@ public class AggregationObserver extends BindingSetUpdater {
private static final AggregationStateSerDe STATE_SERDE = new ObjectSerializationAggregationStateSerDe();
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.AGGREGATION_BINDING_SET, NotificationType.STRONG);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
index 7d0fd5e..c0cfa1d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/BindingSetUpdater.java
@@ -55,7 +55,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
public abstract class BindingSetUpdater extends AbstractObserver {
private static final Logger log = Logger.getLogger(BindingSetUpdater.class);
// DAO
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
+ protected final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
// Updaters
private final JoinResultUpdater joinUpdater = new JoinResultUpdater();
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
index f0fef07..61e7244 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ConstructQueryResultObserver.java
@@ -1,4 +1,3 @@
-package org.apache.rya.indexing.pcj.fluo.app.observers;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -17,54 +16,20 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
* specific language governing permissions and limitations
* under the License.
*/
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-import java.io.UnsupportedEncodingException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
+package org.apache.rya.indexing.pcj.fluo.app.observers;
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
-import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
-import org.apache.rya.api.domain.RyaSubGraph;
-import org.apache.rya.api.resolver.triple.TripleRow;
-import org.apache.rya.api.resolver.triple.TripleRowResolverException;
-import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-
/**
* Monitors the Column {@link FluoQueryColumns#CONSTRUCT_STATEMENTS} for new
* Construct Query {@link RyaStatement}s and exports the results using the
@@ -74,49 +39,7 @@ import com.google.common.collect.ImmutableSet;
*/
public class ConstructQueryResultObserver extends AbstractObserver {
- private static final WholeRowTripleResolver TRIPLE_RESOLVER = new WholeRowTripleResolver();
private static final Logger log = Logger.getLogger(ConstructQueryResultObserver.class);
- private static final RyaSubGraphKafkaSerDe serializer = new RyaSubGraphKafkaSerDe();
-
- /**
- * We expect to see the same expressions a lot, so we cache the simplified
- * forms.
- */
- private final Map<String, String> simplifiedVisibilities = new HashMap<>();
-
- /**
- * Builders for each type of result exporter we support.
- */
- private static final ImmutableSet<IncrementalRyaSubGraphExporterFactory> factories = ImmutableSet
- .<IncrementalRyaSubGraphExporterFactory> builder().add(new KafkaRyaSubGraphExporterFactory()).build();
-
- /**
- * The exporters that are configured.
- */
- private ImmutableSet<IncrementalRyaSubGraphExporter> exporters = null;
-
- /**
- * Before running, determine which exporters are configured and set them up.
- */
- @Override
- public void init(final Context context) {
- final ImmutableSet.Builder<IncrementalRyaSubGraphExporter> exportersBuilder = ImmutableSet.builder();
-
- for (final IncrementalRyaSubGraphExporterFactory builder : factories) {
- try {
- log.debug("ConstructQueryResultObserver.init(): for each exportersBuilder=" + builder);
-
- final Optional<IncrementalRyaSubGraphExporter> exporter = builder.build(context);
- if (exporter.isPresent()) {
- exportersBuilder.add(exporter.get());
- }
- } catch (final IncrementalExporterFactoryException e) {
- log.error("Could not initialize a result exporter.", e);
- }
- }
-
- exporters = exportersBuilder.build();
- }
@Override
public ObservedColumn getObservedColumn() {
@@ -125,74 +48,20 @@ public class ConstructQueryResultObserver extends AbstractObserver {
@Override
public void process(TransactionBase tx, Bytes row, Column col) throws Exception {
+
+ //Build row for parent that result will be written to
+ BindingSetRow bsRow = BindingSetRow.make(row);
+ String constructNodeId = bsRow.getNodeId();
+ String bsString= bsRow.getBindingSetString();
+ String parentNodeId = tx.get(Bytes.of(constructNodeId), FluoQueryColumns.CONSTRUCT_PARENT_NODE_ID).toString();
+ String rowString = parentNodeId + IncrementalUpdateConstants.NODEID_BS_DELIM + bsString;
+
+ //Get NodeType of the parent node
+ NodeType parentType = NodeType.fromNodeId(parentNodeId).get();
+ //Get data for the ConstructQuery result
Bytes bytes = tx.get(row, col);
- RyaSubGraph subgraph = serializer.fromBytes(bytes.toArray());
- Set<RyaStatement> statements = subgraph.getStatements();
- if (statements.size() > 0) {
- byte[] visibility = statements.iterator().next().getColumnVisibility();
- visibility = simplifyVisibilities(visibility);
- for(RyaStatement statement: statements) {
- statement.setColumnVisibility(visibility);
- }
- subgraph.setStatements(statements);
-
- for (IncrementalRyaSubGraphExporter exporter : exporters) {
- exporter.export(row.toString(), subgraph);
- }
- }
- //add generated triples back into Fluo for chaining queries together
- insertTriples(tx, subgraph.getStatements());
- }
-
- @Override
- public void close() {
- if(exporters != null) {
- for(final IncrementalRyaSubGraphExporter exporter : exporters) {
- try {
- exporter.close();
- } catch(final Exception e) {
- log.warn("Problem encountered while closing one of the exporters.", e);
- }
- }
- }
- }
-
- private byte[] simplifyVisibilities(byte[] visibilityBytes) throws UnsupportedEncodingException {
- // Simplify the result's visibilities and cache new simplified
- // visibilities
- String visibility = new String(visibilityBytes, "UTF-8");
- if (!simplifiedVisibilities.containsKey(visibility)) {
- String simplified = VisibilitySimplifier.simplify(visibility);
- simplifiedVisibilities.put(visibility, simplified);
- }
- return simplifiedVisibilities.get(visibility).getBytes("UTF-8");
+ //Write result to parent
+ tx.set(Bytes.of(rowString), parentType.getResultColumn(), bytes);
}
-
- private void insertTriples(TransactionBase tx, final Collection<RyaStatement> triples) {
-
- for (final RyaStatement triple : triples) {
- Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility());
- try {
- tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0])));
- } catch (final TripleRowResolverException e) {
- log.error("Could not convert a Triple into the SPO format: " + triple);
- }
- }
- }
-
-
- /**
- * Converts a triple into a byte[] holding the Rya SPO representation of it.
- *
- * @param triple - The triple to convert. (not null)
- * @return The Rya SPO representation of the triple.
- * @throws TripleRowResolverException The triple could not be converted.
- */
- public static byte[] spoFormat(final RyaStatement triple) throws TripleRowResolverException {
- checkNotNull(triple);
- final Map<TABLE_LAYOUT, TripleRow> serialized = TRIPLE_RESOLVER.serialize(triple);
- final TripleRow spoRow = serialized.get(TABLE_LAYOUT.SPO);
- return spoRow.getRow();
- }
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
index ee03334..b4edfea 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/FilterObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
import org.openrdf.query.BindingSet;
@@ -39,8 +38,6 @@ public class FilterObserver extends BindingSetUpdater {
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.FILTER_BINDING_SET, NotificationType.STRONG);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
index 28e31d8..c56a98f 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/JoinObserver.java
@@ -24,7 +24,6 @@ import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -39,8 +38,6 @@ public class JoinObserver extends BindingSetUpdater {
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.JOIN_BINDING_SET, NotificationType.STRONG);
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
index e7072e7..7d96baa 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.PeriodicQueryUpdater;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -42,7 +41,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
public class PeriodicQueryObserver extends BindingSetUpdater {
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@Override
public ObservedColumn getObservedColumn() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
index b712606..5d73b2e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/ProjectionObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes;
import org.apache.log4j.Logger;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -37,7 +36,6 @@ public class ProjectionObserver extends BindingSetUpdater {
private static final Logger log = Logger.getLogger(ProjectionObserver.class);
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
@Override
public ObservedColumn getObservedColumn() {
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index e6368ba..ba7beee 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -20,24 +20,24 @@ package org.apache.rya.indexing.pcj.fluo.app.observers;
import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.Column;
import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
+import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory;
-import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
-import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
@@ -46,28 +46,23 @@ import com.google.common.collect.ImmutableSet;
* Performs incremental result exporting to the configured destinations.
*/
public class QueryResultObserver extends AbstractObserver {
+
private static final Logger log = Logger.getLogger(QueryResultObserver.class);
-
- private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
-
- /**
- * We expect to see the same expressions a lot, so we cache the simplified forms.
- */
- private final Map<String, String> simplifiedVisibilities = new HashMap<>();
-
+ private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
+
/**
- * Builders for each type of result exporter we support.
+ * Builders for each type of {@link IncrementalBindingSetExporter} we support.
*/
- private static final ImmutableSet<IncrementalBindingSetExporterFactory> factories =
- ImmutableSet.<IncrementalBindingSetExporterFactory>builder()
+ private static final ImmutableSet<IncrementalResultExporterFactory> factories =
+ ImmutableSet.<IncrementalResultExporterFactory>builder()
.add(new RyaBindingSetExporterFactory())
.add(new KafkaBindingSetExporterFactory())
+ .add(new KafkaRyaSubGraphExporterFactory())
+ .add(new RyaSubGraphExporterFactory())
+ .add(new PeriodicBindingSetExporterFactory())
.build();
-
- /**
- * The exporters that are configured.
- */
- private ImmutableSet<IncrementalBindingSetExporter> exporters = null;
+
+ private ExporterManager exporterManager;
@Override
public ObservedColumn getObservedColumn() {
@@ -79,63 +74,46 @@ public class QueryResultObserver extends AbstractObserver {
*/
@Override
public void init(final Context context) {
- final ImmutableSet.Builder<IncrementalBindingSetExporter> exportersBuilder = ImmutableSet.builder();
-
- for(final IncrementalBindingSetExporterFactory builder : factories) {
+
+ ExporterManager.Builder managerBuilder = ExporterManager.builder();
+
+ for(final IncrementalResultExporterFactory builder : factories) {
try {
log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
- final Optional<IncrementalBindingSetExporter> exporter = builder.build(context);
+ final Optional<IncrementalResultExporter> exporter = builder.build(context);
if(exporter.isPresent()) {
- exportersBuilder.add(exporter.get());
+ managerBuilder.addIncrementalResultExporter(exporter.get());
}
} catch (final IncrementalExporterFactoryException e) {
log.error("Could not initialize a result exporter.", e);
}
}
-
- exporters = exportersBuilder.build();
+
+ exporterManager = managerBuilder.build();
}
+
@Override
public void process(final TransactionBase tx, final Bytes brow, final Column col) throws Exception {
final String row = brow.toString();
- // Read the SPARQL query and it Binding Set from the row id.
+ // Read the queryId from the row and get the QueryMetadata.
final String queryId = row.split(NODEID_BS_DELIM)[0];
+ final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
// Read the Child Binding Set that will be exported.
final Bytes valueBytes = tx.get(brow, col);
- final VisibilityBindingSet result = BS_SERDE.deserialize(valueBytes);
- // Simplify the result's visibilities.
- final String visibility = result.getVisibility();
- if(!simplifiedVisibilities.containsKey(visibility)) {
- final String simplified = VisibilitySimplifier.simplify( visibility );
- simplifiedVisibilities.put(visibility, simplified);
- }
- result.setVisibility( simplifiedVisibilities.get(visibility) );
-
- // Export the result using each of the provided exporters.
- for(final IncrementalBindingSetExporter exporter : exporters) {
- try {
- exporter.export(tx, queryId, result);
- } catch (final ResultExportException e) {
- log.error("Could not export a binding set for query '" + queryId + "'. Binding Set: " + result, e);
- }
- }
+ exporterManager.export(metadata.getQueryType(), metadata.getExportStrategies(), queryId, valueBytes);
}
@Override
public void close() {
- if(exporters != null) {
- for(final IncrementalBindingSetExporter exporter : exporters) {
- try {
- exporter.close();
- } catch(final Exception e) {
- log.warn("Problem encountered while closing one of the exporters.", e);
- }
- }
+ try {
+ exporterManager.close();
+ } catch (Exception e) {
+ log.warn("Encountered problems closing the ExporterManager.");
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
index 69a651e..607267a 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/StatementPatternObserver.java
@@ -24,7 +24,6 @@ import org.apache.fluo.api.client.TransactionBase;
import org.apache.fluo.api.data.Bytes;
import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
-import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
@@ -39,9 +38,6 @@ public class StatementPatternObserver extends BindingSetUpdater {
private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe();
- // DAO
- private final FluoQueryMetadataDAO queryDao = new FluoQueryMetadataDAO();
-
@Override
public ObservedColumn getObservedColumn() {
return new ObservedColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, NotificationType.STRONG);