You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rya.apache.org by pu...@apache.org on 2017/04/04 12:56:39 UTC
incubator-rya git commit: RYA-128 closes #121;
closes RYA-128 trigger service to Kafka.
Repository: incubator-rya
Updated Branches:
refs/heads/master 529f2d595 -> b03b18938
RYA-128 closes #121; closes RYA-128 trigger service to Kafka.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/b03b1893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/b03b1893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/b03b1893
Branch: refs/heads/master
Commit: b03b18938c55ff3e896b3d9969a7b5dc54753129
Parents: 529f2d5
Author: David W. Lotts <da...@parsons.com>
Authored: Tue Nov 1 17:37:07 2016 -0400
Committer: pujav65 <pu...@gmail.com>
Committed: Tue Apr 4 08:55:38 2017 -0400
----------------------------------------------------------------------
.../rya/indexing/pcj/fluo/api/CreatePcj.java | 18 +-
extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 48 ++-
.../app/export/rya/BindingSetSerializer.java | 137 +++++++++
.../app/export/rya/KafkaExportParameters.java | 84 ++++++
.../app/export/rya/KafkaResultExporter.java | 75 +++++
.../export/rya/KafkaResultExporterFactory.java | 64 ++++
.../fluo/app/observers/QueryResultObserver.java | 14 +-
.../export/rya/KafkaExportParametersTest.java | 97 +++++++
.../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 34 +++
.../apache/rya/indexing/pcj/fluo/ITBase.java | 30 +-
.../pcj/fluo/integration/KafkaExportIT.java | 290 +++++++++++++++++++
extras/rya.prospector/pom.xml | 39 +++
12 files changed, 905 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
index 6567371..d29191d 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java
@@ -132,12 +132,12 @@ public class CreatePcj {
* @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}.
* @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}.
*/
- public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
- final Connector accumulo, String ryaInstance )
- throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException {
- requireNonNull(pcjId);
- requireNonNull(pcjStorage);
- requireNonNull(fluo);
+ public String withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
+ final Connector accumulo, String ryaInstance )
+ throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException {
+ requireNonNull(pcjId);
+ requireNonNull(pcjStorage);
+ requireNonNull(fluo);
requireNonNull(accumulo);
requireNonNull(ryaInstance);
@@ -162,13 +162,16 @@ public class CreatePcj {
final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null);
final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds);
+ // return queryId to the caller for later monitoring from the export.
+ String queryId = null;
+
try (Transaction tx = fluo.newTransaction()) {
// Write the query's structure to Fluo.
new FluoQueryMetadataDAO().write(tx, fluoQuery);
// The results of the query are eventually exported to an instance
// of Rya, so store the Rya ID for the PCJ.
- final String queryId = fluoQuery.getQueryMetadata().getNodeId();
+ queryId = fluoQuery.getQueryMetadata().getNodeId();
tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId);
tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId);
@@ -206,6 +209,7 @@ public class CreatePcj {
writeBatch(fluo, triplesBatch);
triplesBatch.clear();
}
+ return queryId;
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index fd2e582..de51008 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -33,7 +33,9 @@ under the License.
A Fluo implementation of Rya Precomputed Join Indexing. This module produces
a jar that may be executed by the 'fluo' command line tool as a YARN job.
</description>
-
+ <properties>
+ <kryo.version>3.0.3</kryo.version>
+ </properties>
<dependencies>
<!-- Rya Runtime Dependencies. -->
<dependency>
@@ -62,6 +64,50 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ <version>${kryo.version}</version>
+ </dependency>
+
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.0</version>
+ <classifier>test</classifier>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.0</version>
+ <classifier>test</classifier>
+<!-- <scope>test</scope> -->
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- Testing dependencies. -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
new file mode 100644
index 0000000..7b35fec
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.ListBindingSet;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
+ private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
+ @Override
+ protected Kryo initialValue() {
+ Kryo kryo = new Kryo();
+ return kryo;
+ };
+ };
+
+ @Override
+ public VisibilityBindingSet deserialize(String topic, byte[] data) {
+ KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
+ Input input = new Input(new ByteArrayInputStream(data));
+ return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
+ // this is an alternative, or perhaps replace it:
+ // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ // Do nothing.
+ }
+
+ @Override
+ public byte[] serialize(String topic, VisibilityBindingSet data) {
+ KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ Output output = new Output(baos);
+ internalSerializer.write(kryos.get(), output, data);
+ output.flush();
+ byte[] array = baos.toByteArray();
+ return array;
+ // this is an alternative, or perhaps replace it:
+ // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void close() {
+ // Do nothing.
+ }
+
+ private static Value makeValue(final String valueString, final URI typeURI) {
+ // Convert the String Value into a Value.
+ final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
+ if (typeURI.equals(XMLSchema.ANYURI)) {
+ return valueFactory.createURI(valueString);
+ } else {
+ return valueFactory.createLiteral(valueString, typeURI);
+ }
+ }
+
+ /**
+ * De/Serialize a visibility binding set using the Kryo library.
+ * TODO rename this KryoSomething and change the package.
+ *
+ */
+ private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
+ private static final Logger log = Logger.getLogger(BindingSetSerializer.class);
+ @Override
+ public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) {
+ log.debug("Serializer writing visBindingSet" + visBindingSet);
+ output.writeString(visBindingSet.getVisibility());
+ // write the number count for the reader.
+ output.writeInt(visBindingSet.size());
+ for (Binding binding : visBindingSet) {
+ output.writeString(binding.getName());
+ final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue());
+ final String valueString = ryaValue.getData();
+ final URI type = ryaValue.getDataType();
+ output.writeString(valueString);
+ output.writeString(type.toString());
+ }
+ }
+
+ @Override
+ public VisibilityBindingSet read(Kryo kryo, Input input, Class<VisibilityBindingSet> aClass) {
+ log.debug("Serializer reading visBindingSet");
+ String visibility = input.readString();
+ int bindingCount = input.readInt();
+ ArrayList<String> namesList = new ArrayList<String>(bindingCount);
+ ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount);
+ for (int i = bindingCount; i > 0; i--) {
+ namesList.add(input.readString());
+ String valueString = input.readString();
+ final URI type = new URIImpl(input.readString());
+ valuesList.add(makeValue(valueString, type));
+ }
+ BindingSet bindingSet = new ListBindingSet(namesList, valuesList);
+ return new VisibilityBindingSet(bindingSet, visibility);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
new file mode 100644
index 0000000..3dbb1d8
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.fluo.api.observer.Observer;
+import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
+
+/**
+ * Provides read/write functions to the parameters map that is passed into an
+ * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
+ * to PCJ exporting to a kafka topic.
+ * Remember: if doesn't count unless it is added to params
+ */
+
+public class KafkaExportParameters extends ParametersBase {
+
+ public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled";
+
+ public KafkaExportParameters(final Map<String, String> params) {
+ super(params);
+ }
+
+ /**
+ * @param isExportToKafka
+ * - {@code True} if the Fluo application should export
+ * to Kafka; otherwise {@code false}.
+ */
+ public void setExportToKafka(final boolean isExportToKafka) {
+ setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
+ }
+
+ /**
+ * @return {@code True} if the Fluo application should export to Kafka; otherwise
+ * {@code false}. Defaults to {@code false} if no value is present.
+ */
+ public boolean isExportToKafka() {
+ return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
+ }
+
+ /**
+ * Add the properties to the params, NOT keeping them separate from the other params.
+ * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
+ *
+ * @param producerConfig
+ */
+ public void setProducerConfig(final Properties producerConfig) {
+ for (Object key : producerConfig.keySet().toArray()) {
+ Object value = producerConfig.getProperty(key.toString());
+ this.params.put(key.toString(), value.toString());
+ }
+ }
+
+ /**
+ * @return all the params (not just kafka producer Configuration) as a {@link Properties}
+ */
+ public Properties getProducerConfig() {
+ Properties props = new Properties();
+ for (Object key : params.keySet().toArray()) {
+ Object value = params.get(key.toString());
+ props.put(key.toString(), value.toString());
+ }
+ return props;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
new file mode 100644
index 0000000..362efa7
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+/**
+ * Incrementally exports SPARQL query results to Kafka topics.
+ */
+public class KafkaResultExporter implements IncrementalResultExporter {
+ private final KafkaProducer<String, VisibilityBindingSet> producer;
+ private static final Logger log = Logger.getLogger(KafkaResultExporter.class);
+
+ /**
+ * Constructs an instance given a Kafka producer.
+ *
+ * @param producer
+ * for sending result set alerts to a broker. (not null)
+ * created and configured by {@link KafkaResultExporterFactory}
+ */
+ public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
+ super();
+ checkNotNull(producer, "Producer is required.");
+ this.producer = producer;
+ }
+
+ /**
+ * Send the results to the topic using the queryID as the topicname
+ */
+ @Override
+ public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
+ checkNotNull(fluoTx);
+ checkNotNull(queryId);
+ checkNotNull(result);
+ try {
+ final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
+ String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
+ log.info(msg);
+
+ // Send result on topic
+ ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result);
+ // Can add a key if you need to:
+ // ProducerRecord(String topic, K key, V value)
+ producer.send(rec);
+ log.debug("producer.send(rec) completed");
+
+ } catch (final Throwable e) {
+ throw new ResultExportException("A result could not be exported to Kafka.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
new file mode 100644
index 0000000..9418720
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import org.apache.fluo.api.observer.Observer.Context;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
+import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+
+import com.google.common.base.Optional;
+
+/**
+ * Creates instances of {@link KafkaResultExporter}.
+ * <p/>
+ * Configure a Kafka producer by adding several required Key/values as described here:
+ * http://kafka.apache.org/documentation.html#producerconfigs
+ * <p/>
+ * Here is a simple example:
+ * <pre>
+ * Properties producerConfig = new Properties();
+ * producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ * producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ * producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ * </pre>
+ *
+ * @see ProducerConfig
+ */
+public class KafkaResultExporterFactory implements IncrementalResultExporterFactory {
+ private static final Logger log = Logger.getLogger(KafkaResultExporterFactory.class);
+ @Override
+ public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
+ final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
+ log.debug("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
+ if (exportParams.isExportToKafka()) {
+ // Setup Kafka connection
+ KafkaProducer<String, VisibilityBindingSet> producer = new KafkaProducer<String, VisibilityBindingSet>(exportParams.getProducerConfig());
+ // Create the exporter
+ final IncrementalResultExporter exporter = new KafkaResultExporter(producer);
+ return Optional.of(exporter);
+ } else {
+ return Optional.absent();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index bbca128..a8fc6d9 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -23,11 +23,17 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NO
import java.util.HashMap;
import java.util.Map;
+import org.apache.fluo.api.client.TransactionBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.observer.AbstractObserver;
import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.utils.VisibilitySimplifier;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter.ResultExportException;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory.IncrementalExporterFactoryException;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaResultExporterFactory;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
@@ -38,11 +44,6 @@ import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringCo
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
-import org.apache.fluo.api.client.TransactionBase;
-import org.apache.fluo.api.data.Bytes;
-import org.apache.fluo.api.data.Column;
-import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.rya.accumulo.utils.VisibilitySimplifier;
/**
* Performs incremental result exporting to the configured destinations.
@@ -69,6 +70,7 @@ public class QueryResultObserver extends AbstractObserver {
private static final ImmutableSet<IncrementalResultExporterFactory> factories =
ImmutableSet.<IncrementalResultExporterFactory>builder()
.add(new RyaResultExporterFactory())
+ .add(new KafkaResultExporterFactory())
.build();
/**
@@ -90,6 +92,8 @@ public class QueryResultObserver extends AbstractObserver {
for(final IncrementalResultExporterFactory builder : factories) {
try {
+ log.debug("QueryResultObserver.init(): for each exportersBuilder=" + builder);
+
final Optional<IncrementalResultExporter> exporter = builder.build(context);
if(exporter.isPresent()) {
exportersBuilder.add(exporter.get());
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
new file mode 100644
index 0000000..1e5adbf
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.app.export.rya;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.Test;
+
+/**
+ * Tests the methods of {@link KafkaExportParameters}.
+ */
+public class KafkaExportParametersTest {
+
+ @Test
+ public void writeParams() {
+ final Map<String, String> params = new HashMap<>();
+
+ // Load some values into the params using the wrapper.
+ final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
+ kafkaParams.setExportToKafka(true);
+
+ // Ensure the params map has the expected values.
+ final Map<String, String> expectedParams = new HashMap<>();
+ expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true");
+ assertTrue(kafkaParams.isExportToKafka());
+ assertEquals(expectedParams, params);
+
+ // now go the other way.
+ expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false");
+ kafkaParams.setExportToKafka(false);
+ assertFalse(kafkaParams.isExportToKafka());
+ assertEquals(expectedParams, params);
+ }
+ @Test
+ public void writeParamsProps() {
+ final String key1 = "key1";
+ final String value1First = "value1-preserve-this";
+ final String value1Second = "value1prop";
+ final String key2 = "\u6b4c\u53e4\u4e8b\u5b66\u9031\u6587\u539f\u554f\u696d\u9593\u9769\u793e\u3002"; // http://generator.lorem-ipsum.info/_chinese
+ final String value2 = "\u826f\u6cbb\u9bae\u733f\u6027\u793e\u8cbb\u8457\u4f75\u75c5\u6975\u9a13\u3002";
+
+ final Map<String, String> params = new HashMap<>();
+ // Make sure export key1 is NOT kept separate from producer config key1
+ // This is a change, originally they were kept separate.
+ params.put(key1, value1First);
+ final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
+ // Load some values into the properties using the wrapper.
+ Properties props = new Properties();
+ props.put(key1, value1Second);
+ props.put(key2, value2);
+ kafkaParams.setProducerConfig(props);
+ Properties propsAfter = kafkaParams.getProducerConfig();
+ assertEquals(props, propsAfter);
+ assertEquals(params, params);
+ assertEquals("Should change identical parameters key", params.get(key1), value1Second);
+ assertEquals("Props should have params's key", propsAfter.get(key1), value1Second);
+ assertNotNull("Should have props key", params.get(key2));
+ }
+
+ @Test
+ public void notConfigured() {
+ final Map<String, String> params = new HashMap<>();
+
+ // Ensure an unconfigured parameters map will say kafka export is disabled.
+ final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
+ assertFalse(kafkaParams.isExportToKafka());
+ }
+
+ @Test
+ public void testKafkaResultExporterFactory() {
+ KafkaResultExporterFactory factory = new KafkaResultExporterFactory();
+ assertNotNull(factory);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
index 6bb7105..b7adad6 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml
@@ -56,5 +56,39 @@
<groupId>org.apache.fluo</groupId>
<artifactId>fluo-api</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>0.10.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.0</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- Testing dependencies. -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.11</artifactId>
+ <version>0.10.1.0</version>
+ <classifier>test</classifier>
+<!-- <scope>test</scope> -->
+ <exclusions>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+
+
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
index 293426f..fa9a10e 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/ITBase.java
@@ -390,11 +390,6 @@ public abstract class ITBase {
return conf;
}
- /**
- * Setup a Mini Fluo cluster that uses a temporary directory to store its data.
- *
- * @return A Mini Fluo cluster.
- */
protected MiniFluo startMiniFluo() throws AlreadyInitializedException, TableExistsException {
// Setup the observers that will be used by the Fluo PCJ Application.
final List<ObserverSpecification> observers = new ArrayList<>();
@@ -403,14 +398,9 @@ public abstract class ITBase {
observers.add(new ObserverSpecification(JoinObserver.class.getName()));
observers.add(new ObserverSpecification(FilterObserver.class.getName()));
+ // Set export details for exporting from Fluo to a Rya repository and a subscriber queue.
final HashMap<String, String> exportParams = new HashMap<>();
- final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
- ryaParams.setExportToRya(true);
- ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
- ryaParams.setAccumuloInstanceName(instanceName);
- ryaParams.setZookeeperServers(zookeepers);
- ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
- ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
+ setExportParameters(exportParams);
// Configure the export observer to export new PCJ results to the mini accumulo cluster.
final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
@@ -433,4 +423,20 @@ public abstract class ITBase {
FluoFactory.newAdmin(config).initialize(new FluoAdmin.InitializationOptions().setClearTable(true).setClearZookeeper(true) );
return FluoFactory.newMiniFluo(config);
}
+
+ /**
+ * Set export details for exporting from Fluo to a Rya repository and a subscriber queue.
+ * Override this if you have custom export destinations.
+ *
+ * @param exportParams
+ */
+ protected void setExportParameters(final HashMap<String, String> exportParams) {
+ final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
+ ryaParams.setExportToRya(true);
+ ryaParams.setRyaInstanceName(RYA_INSTANCE_NAME);
+ ryaParams.setAccumuloInstanceName(instanceName);
+ ryaParams.setZookeeperServers(zookeepers);
+ ryaParams.setExporterUsername(ITBase.ACCUMULO_USER);
+ ryaParams.setExporterPassword(ITBase.ACCUMULO_PASSWORD);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
new file mode 100644
index 0000000..10d2530
--- /dev/null
+++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.indexing.pcj.fluo.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Set;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.indexing.pcj.fluo.ITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters;
+import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.junit.Test;
+import org.openrdf.model.impl.URIImpl;
+import org.openrdf.query.Binding;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.impl.BindingImpl;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Sets;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.Time;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+
+/**
+ * Performs integration tests over the Fluo application geared towards Kafka PCJ exporting.
+ * <p>
+ * These tests might be ignored so that they will not run as unit tests while building the application.
+ * Run this test from Maven command line:
+ * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration
+ * $ mvn surefire:test -Dtest=KafkaExportIT
+ */
+public class KafkaExportIT extends ITBase {
+
+ private static final String ZKHOST = "127.0.0.1";
+ private static final String BROKERHOST = "127.0.0.1";
+ private static final String BROKERPORT = "9092";
+ private static final String TOPIC = "testTopic";
+ private ZkUtils zkUtils;
+ private KafkaServer kafkaServer;
+ private EmbeddedZookeeper zkServer;
+ private ZkClient zkClient;
+
+
+ /**
+ * setup mini kafka and call the super to setup mini fluo
+ *
+ * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
+ */
+ @Override
+ public void setupMiniResources() throws Exception {
+ super.setupMiniResources();
+
+ zkServer = new EmbeddedZookeeper();
+ String zkConnect = ZKHOST + ":" + zkServer.port();
+ zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+ zkUtils = ZkUtils.apply(zkClient, false);
+
+ // setup Broker
+ Properties brokerProps = new Properties();
+ brokerProps.setProperty("zookeeper.connect", zkConnect);
+ brokerProps.setProperty("broker.id", "0");
+ brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+ brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
+ KafkaConfig config = new KafkaConfig(brokerProps);
+ Time mock = new MockTime();
+ kafkaServer = TestUtils.createServer(config, mock);
+
+ System.out.println("setup kafka and fluo.");
+ }
+
+ /**
+ * Test kafka without rya code to make sure kafka works in this environment.
+ * If this test fails then its a testing environment issue, not with Rya.
+ * Source: https://github.com/asmaier/mini-kafka
+ *
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ @Test
+ public void embeddedKafkaTest() throws InterruptedException, IOException {
+
+ // create topic
+ AdminUtils.createTopic(zkUtils, TOPIC, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+
+ // setup producer
+ Properties producerProps = new Properties();
+ producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
+ producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
+ KafkaProducer<Integer, byte[]> producer = new KafkaProducer<Integer, byte[]>(producerProps);
+
+ // setup consumer
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
+ consumerProps.setProperty("group.id", "group0");
+ consumerProps.setProperty("client.id", "consumer0");
+ consumerProps.setProperty("key.deserializer","org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put("auto.offset.reset", "earliest"); // to make sure the consumer starts from the beginning of the topic
+ KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(TOPIC));
+
+ // send message
+ ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(TOPIC, 42, "test-message".getBytes(StandardCharsets.UTF_8));
+ producer.send(data);
+ producer.close();
+
+ // starting consumer
+ ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+ assertEquals(1, records.count());
+ Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
+ ConsumerRecord<Integer, byte[]> record = recordIterator.next();
+ System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
+ assertEquals(42, (int) record.key());
+ assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
+ consumer.close();
+ }
+
+ @Test
+ public void newResultsExportedTest() throws Exception {
+ final String sparql = "SELECT ?customer ?worker ?city " + "{ " + "FILTER(?customer = <http://Alice>) " + "FILTER(?city = <http://London>) " + "?customer <http://talksTo> ?worker. " + "?worker <http://livesIn> ?city. " + "?worker <http://worksAt> <http://Chipotle>. " + "}";
+
+ // Triples that will be streamed into Fluo after the PCJ has been created.
+ final Set<RyaStatement> streamedTriples = Sets.newHashSet(makeRyaStatement("http://Alice", "http://talksTo", "http://Bob"), makeRyaStatement("http://Bob", "http://livesIn", "http://London"), makeRyaStatement("http://Bob", "http://worksAt", "http://Chipotle"),
+ makeRyaStatement("http://Alice", "http://talksTo", "http://Charlie"), makeRyaStatement("http://Charlie", "http://livesIn", "http://London"), makeRyaStatement("http://Charlie", "http://worksAt", "http://Chipotle"),
+ makeRyaStatement("http://Alice", "http://talksTo", "http://David"), makeRyaStatement("http://David", "http://livesIn", "http://London"), makeRyaStatement("http://David", "http://worksAt", "http://Chipotle"),
+ makeRyaStatement("http://Alice", "http://talksTo", "http://Eve"), makeRyaStatement("http://Eve", "http://livesIn", "http://Leeds"), makeRyaStatement("http://Eve", "http://worksAt", "http://Chipotle"),
+ makeRyaStatement("http://Frank", "http://talksTo", "http://Alice"), makeRyaStatement("http://Frank", "http://livesIn", "http://London"), makeRyaStatement("http://Frank", "http://worksAt", "http://Chipotle"));
+
+ // The expected results of the SPARQL query once the PCJ has been computed.
+ final Set<BindingSet> expected = new HashSet<>();
+ expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Bob")), new BindingImpl("city", new URIImpl("http://London"))));
+ expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://Charlie")), new BindingImpl("city", new URIImpl("http://London"))));
+ expected.add(makeBindingSet(new BindingImpl("customer", new URIImpl("http://Alice")), new BindingImpl("worker", new URIImpl("http://David")), new BindingImpl("city", new URIImpl("http://London"))));
+
+ // Create the PCJ table.
+ final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME);
+ final String pcjId = pcjStorage.createPcj(sparql);
+
+ // Tell the Fluo app to maintain the PCJ.
+ CreatePcj createPcj = new CreatePcj();
+ String QueryIdIsTopicName = createPcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME);
+
+ // Stream the data into Fluo.
+ new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String> absent());
+
+ // Fetch the exported results from Accumulo once the observers finish working.
+ fluo.waitForObservers();
+
+ /// KafkaConsumer<Integer, byte[]> consumer = makeConsumer(QueryIdIsTopicName);
+ KafkaConsumer<Integer, VisibilityBindingSet> consumer = makeConsumer(QueryIdIsTopicName);
+
+ // starting consumer polling for messages
+ /// ConsumerRecords<Integer, byte[]> records = consumer.poll(3000);
+ ConsumerRecords<Integer, VisibilityBindingSet> records = consumer.poll(3000);
+ /// Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator();
+ Iterator<ConsumerRecord<Integer, VisibilityBindingSet>> recordIterator = records.iterator();
+ boolean allExpected = true;
+ ConsumerRecord<Integer, VisibilityBindingSet> unexpectedRecord = null;
+ while (recordIterator.hasNext()) {
+ ConsumerRecord<Integer, VisibilityBindingSet> record = recordIterator.next();
+ System.out.printf("Consumed offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value().toString());
+ boolean expectedThis = expected.contains(record.value());
+ if (!expectedThis) {
+ System.out.println("This consumed record is not expected.");
+ unexpectedRecord = record;
+ }
+ allExpected = allExpected && expectedThis;
+ }
+ assertTrue("Must consume expected record: not expected:" + unexpectedRecord, allExpected);
+ assertNotEquals("Should get some results", 0, records.count());
+ // assertEquals(42, (int) record.key());
+ // assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8));
+
+ }
+
+ /**
+ * A helper function for creating a {@link BindingSet} from an array of
+ * {@link Binding}s.
+ *
+ * @param bindings
+ * - The bindings to include in the set. (not null)
+ * @return A {@link BindingSet} holding the bindings.
+ */
+ protected static BindingSet makeBindingSet(final Binding... bindings) {
+ return new VisibilityBindingSet(ITBase.makeBindingSet(bindings));
+ }
+
+ /**
+ * @param TopicName
+ * @return
+ */
+ protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(String TopicName) {
+ // setup consumer
+ Properties consumerProps = new Properties();
+ consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+ consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0");
+ consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+ consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
+ consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer");
+ // "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // to make sure the consumer starts from the beginning of the topic
+ /// KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
+ KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps);
+ consumer.subscribe(Arrays.asList(TopicName));
+ return consumer;
+ }
+
+ /**
+ * Add info about the kafka queue/topic to receive the export.
+ * Call super to get the Rya parameters.
+ *
+ * @see org.apache.rya.indexing.pcj.fluo.ITBase#setExportParameters(java.util.HashMap)
+ */
+ @Override
+ protected void setExportParameters(HashMap<String, String> exportParams) {
+ // Get the defaults
+ super.setExportParameters(exportParams);
+ // Add the kafka parameters
+ final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams);
+ kafkaParams.setExportToKafka(true);
+ // Configure the Producer
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT);
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.rya.indexing.pcj.fluo.app.export.rya.BindingSetSerializer");
+ // "org.apache.kafka.common.serialization.StringSerializer");
+ kafkaParams.setProducerConfig(producerConfig);
+ }
+
+ /**
+ * Close all the Kafka mini server and mini-zookeeper
+ *
+ * @see org.apache.rya.indexing.pcj.fluo.ITBase#shutdownMiniResources()
+ */
+ @Override
+ public void shutdownMiniResources() {
+ super.shutdownMiniResources();
+ kafkaServer.shutdown();
+ zkClient.close();
+ zkServer.shutdown();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b03b1893/extras/rya.prospector/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.prospector/pom.xml b/extras/rya.prospector/pom.xml
index 0a3b6cf..952ab94 100644
--- a/extras/rya.prospector/pom.xml
+++ b/extras/rya.prospector/pom.xml
@@ -75,6 +75,45 @@ under the License.
</excludes>
</configuration>
</plugin>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <versionRange>[3.2,)</versionRange>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.codehaus.groovy</groupId>
+ <artifactId>groovy-eclipse-compiler</artifactId>
+ <versionRange>[2.9.1-01,)</versionRange>
+ <goals>
+ <goal>add-groovy-build-paths</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
</plugins>
</pluginManagement>
<plugins>