You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@rya.apache.org by DLotts <gi...@git.apache.org> on 2016/11/03 17:26:50 UTC

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

GitHub user DLotts opened a pull request:

    https://github.com/apache/incubator-rya/pull/121

    [WIP] Rya-128 trigger service to Kafka

    ## Description
    Added an additional exporter that sends a message thru Kafka whenever a PreComputed Join exports new results.
    
    ### Tests
    yep, several new tests
    
    ### Links
    [Jira](https://issues.apache.org/jira/browse/RYA-128)
    
    ### Checklist
    - [ ] Code Review
    - [ ] Squash Commits
    
    #### People To Reivew


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/DLotts/incubator-rya rya-128_trigger

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-rya/pull/121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #121
    
----

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109483968
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    +    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
    +        @Override
    +        protected Kryo initialValue() {
    +            Kryo kryo = new Kryo();
    +            return kryo;
    +        };
    +    };
    +
    +    @Override
    +    public VisibilityBindingSet deserialize(String topic, byte[] data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        Input input = new Input(new ByteArrayInputStream(data));
    +        return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
    +    }
    +
    +    @Override
    +    public void configure(Map<String, ?> configs, boolean isKey) {
    +        // Do nothing.
    +    }
    +
    +    @Override
    +    public byte[] serialize(String topic, VisibilityBindingSet data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        Output output = new Output(baos);
    +        internalSerializer.write(kryos.get(), output, data);
    +        output.flush();
    +        byte[] array = baos.toByteArray();
    +        return array;
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
    +    }
    +
    +    @Override
    +    public void close() {
    +        // Do nothing.
    +    }
    +
    +    private static Value makeValue(final String valueString, final URI typeURI) {
    --- End diff --
    
    This method would no longer be needed if you don't do the RyaType conversion for values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825695
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +
    +    /**
    +     * Constructor
    +     * 
    +     * @param producer
    +     *            created by {@link KafkaResultExporterFactory}
    +     */
    +    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
    +        super();
    +        checkNotNull(producer, "Producer is required.");
    +        this.producer = producer;
    +    }
    +
    +    /**
    +     * Send the results to the topic using the queryID as the topicname
    +     */
    +    @Override
    +    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
    +        checkNotNull(fluoTx);
    +        checkNotNull(queryId);
    +        checkNotNull(result);
    +        try {
    +        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
    +            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
    +            System.out.println(msg);
    --- End diff --
    
    Use a logger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109487077
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.apache.fluo.api.observer.Observer;
    +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
    +
    +/**
    + * Provides read/write functions to the parameters map that is passed into an
    + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
    + * to PCJ exporting to a kafka topic.
    + * Remember: if doesn't count unless it is added to params
    + */
    +
    +public class KafkaExportParameters extends ParametersBase {
    +
    +    public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled";
    +
    +    public KafkaExportParameters(final Map<String, String> params) {
    +        super(params);
    +    }
    +
    +    /**
    +     * @param isExportToKafka
    +     *            - {@code True} if the Fluo application should export
    +     *            to Kafka; otherwise {@code false}.
    +     */
    +    public void setExportToKafka(final boolean isExportToKafka) {
    +        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
    +    }
    +
    +    /**
    +     * @return {@code True} if the Fluo application should export to Kafka; otherwise
    +     *         {@code false}. Defaults to {@code false} if no value is present.
    +     */
    +    public boolean isExportToKafka() {
    +        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
    +    }
    +
    +    /**
    +     * Add the properties to the params, NOT keeping them separate from the other params.
    +     * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
    +     * 
    +     * @param producerConfig
    +     */
    +    public void setProducerConfig(final Properties producerConfig) {
    +        for (Object key : producerConfig.keySet().toArray()) {
    +            Object value = producerConfig.getProperty(key.toString());
    +            this.params.put(key.toString(), value.toString());
    +        }
    +    }
    +
    +    /**
    +     * @return all the params (not just kafka producer Configuration) as a {@link Properties}
    +     */
    +    public Properties getProducerConfig() {
    --- End diff --
    
    Same comment about the 'set' functions having special meanings. Maybe this should be 'make' instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-rya/pull/121


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109480089
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    --- End diff --
    
    I don't think this should be packaged within org.apache.rya.indexing.pcj.fluo.app.export.rya because that package is for the export code that writes back to a Rya instance. I don't think that's what this is for, since this writes to a Kafka topic.
    
    Suggested new package:
    org.apache.rya.indexing.pcj.fluo.app.export.kafka


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92827071
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---
    @@ -62,6 +64,50 @@ under the License.
                 	</exclusion>
                 </exclusions>
             </dependency>
    +        
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <exclusions>
    +                <exclusion>
    +                    <artifactId>slf4j-log4j12</artifactId>
    +                    <groupId>org.slf4j</groupId>
    +                </exclusion>
    +            </exclusions>
    +        </dependency>
    +                <dependency>
    +            <groupId>com.esotericsoftware</groupId>
    +            <artifactId>kryo</artifactId>
    +            <version>${kryo.version}</version>
    +        </dependency>
    +        
    +        <!-- Testing dependencies. -->
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    +          <classifier>test</classifier>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <classifier>test</classifier>
    +<!--             <scope>test</scope> -->
    --- End diff --
    
    Remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92827431
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,112 @@
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    --- End diff --
    
    Also, maybe rename the class or repackage it or something. It specifically serializes to Kryo, but that is not apparent from the name or package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825442
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +
    +    /**
    +     * Constructor
    +     * 
    +     * @param producer
    +     *            created by {@link KafkaResultExporterFactory}
    --- End diff --
    
    Document the nullness contract. Also, this doc doesn't really explain what the producer is for.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109487664
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Kafka topics.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +    private static final Logger log = Logger.getLogger(KafkaResultExporter.class);
    +
    +    /**
    +     * Constructs an instance given a Kafka producer.
    +     * 
    +     * @param producer
    +     *            for sending result set alerts to a broker. (not null)
    +     *            created and configured by {@link KafkaResultExporterFactory}
    +     */
    +    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
    +        super();
    +        checkNotNull(producer, "Producer is required.");
    +        this.producer = producer;
    +    }
    +
    +    /**
    +     * Send the results to the topic using the queryID as the topicname
    +     */
    +    @Override
    +    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
    +        checkNotNull(fluoTx);
    +        checkNotNull(queryId);
    +        checkNotNull(result);
    +        try {
    +            final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
    +            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
    +            log.info(msg);
    --- End diff --
    
    This should be a trace log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109488282
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---
    @@ -56,5 +56,39 @@
                 <groupId>org.apache.fluo</groupId>
                 <artifactId>fluo-api</artifactId>
             </dependency>
    +
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <exclusions>
    +                <exclusion>
    +                    <artifactId>slf4j-log4j12</artifactId>
    +                    <groupId>org.slf4j</groupId>
    +                </exclusion>
    +            </exclusions>
    +        </dependency>
    +        <!-- Testing dependencies. -->
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <classifier>test</classifier>
    +<!--             <scope>test</scope> -->
    --- End diff --
    
    Still applies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825742
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +
    +    /**
    +     * Constructor
    +     * 
    +     * @param producer
    +     *            created by {@link KafkaResultExporterFactory}
    +     */
    +    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
    +        super();
    +        checkNotNull(producer, "Producer is required.");
    +        this.producer = producer;
    +    }
    +
    +    /**
    +     * Send the results to the topic using the queryID as the topicname
    +     */
    +    @Override
    +    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
    +        checkNotNull(fluoTx);
    +        checkNotNull(queryId);
    +        checkNotNull(result);
    +        try {
    +        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
    +            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
    +            System.out.println(msg);
    +
    +            // Send result on topic
    +            ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result);
    --- End diff --
    
    Delete commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109488441
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/KafkaExportIT.java ---
    @@ -0,0 +1,290 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.integration;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertNotEquals;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.io.IOException;
    +import java.nio.charset.StandardCharsets;
    +import java.nio.file.Files;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.Iterator;
    +import java.util.Properties;
    +import java.util.Set;
    +
    +import org.I0Itec.zkclient.ZkClient;
    +import org.apache.kafka.clients.consumer.ConsumerConfig;
    +import org.apache.kafka.clients.consumer.ConsumerRecord;
    +import org.apache.kafka.clients.consumer.ConsumerRecords;
    +import org.apache.kafka.clients.consumer.KafkaConsumer;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.api.domain.RyaStatement;
    +import org.apache.rya.indexing.pcj.fluo.ITBase;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj;
    +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
    +import org.apache.rya.indexing.pcj.fluo.app.export.rya.KafkaExportParameters;
    +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.junit.Test;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.BindingImpl;
    +
    +import com.google.common.base.Optional;
    +import com.google.common.collect.Sets;
    +
    +import kafka.admin.AdminUtils;
    +import kafka.admin.RackAwareMode;
    +import kafka.server.KafkaConfig;
    +import kafka.server.KafkaServer;
    +import kafka.utils.MockTime;
    +import kafka.utils.TestUtils;
    +import kafka.utils.Time;
    +import kafka.utils.ZKStringSerializer$;
    +import kafka.utils.ZkUtils;
    +import kafka.zk.EmbeddedZookeeper;
    +
    +/**
    + * Performs integration tests over the Fluo application geared towards Kafka PCJ exporting.
    + * <p>
    + * These tests might be ignored so that they will not run as unit tests while building the application.
    + * Run this test from Maven command line:
    + * $ cd rya/extras/rya.pcj.fluo/pcj.fluo.integration
    + * $ mvn surefire:test -Dtest=KafkaExportIT
    + */
    +public class KafkaExportIT extends ITBase {
    +
    +    private static final String ZKHOST = "127.0.0.1";
    +    private static final String BROKERHOST = "127.0.0.1";
    +    private static final String BROKERPORT = "9092";
    +    private static final String TOPIC = "testTopic";
    +    private ZkUtils zkUtils;
    +    private KafkaServer kafkaServer;
    +    private EmbeddedZookeeper zkServer;
    +    private ZkClient zkClient;
    +
    +
    +        /**
    +     * setup mini kafka and call the super to setup mini fluo
    +     * 
    +     * @see org.apache.rya.indexing.pcj.fluo.ITBase#setupMiniResources()
    +     */
    +    @Override
    +    public void setupMiniResources() throws Exception {
    +        super.setupMiniResources();
    +
    +        zkServer = new EmbeddedZookeeper();
    +        String zkConnect = ZKHOST + ":" + zkServer.port();
    +        zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
    +        zkUtils = ZkUtils.apply(zkClient, false);
    +
    +        // setup Broker
    +        Properties brokerProps = new Properties();
    +        brokerProps.setProperty("zookeeper.connect", zkConnect);
    +        brokerProps.setProperty("broker.id", "0");
    +        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
    +        brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
    +        KafkaConfig config = new KafkaConfig(brokerProps);
    +        Time mock = new MockTime();
    +        kafkaServer = TestUtils.createServer(config, mock);
    +
    +        System.out.println("setup kafka and fluo.");
    --- End diff --
    
    Shouldn't have any SOPs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by amihalik <gi...@git.apache.org>.
Github user amihalik commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    asfbot build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92826178
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---
    @@ -90,6 +92,8 @@ public void init(final Context context) {
     
             for(final IncrementalResultExporterFactory builder : factories) {
                 try {
    +                System.out.println("QueryResultObserver.init(): for each exportersBuilder=" + builder);
    --- End diff --
    
    log.debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by amihalik <gi...@git.apache.org>.
Github user amihalik commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r91190749
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,112 @@
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    --- End diff --
    
    This will fail RAT check.  add license.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by DLotts <gi...@git.apache.org>.
Github user DLotts commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r99260012
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    --- End diff --
    
    fixed many of these  for KafkaResultExporter.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by pujav65 <gi...@git.apache.org>.
Github user pujav65 commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    hey @DLotts can you fix this conflict so we can see if it builds?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109488860
  
    --- Diff: extras/rya.prospector/pom.xml ---
    @@ -75,6 +75,45 @@ under the License.
                             </excludes>
                         </configuration>
                     </plugin>
    +                <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
    --- End diff --
    
    You shouldn't be touching the rya.prospector project in this ticket.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109478050
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    +    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
    +        @Override
    +        protected Kryo initialValue() {
    +            Kryo kryo = new Kryo();
    +            return kryo;
    +        };
    +    };
    +
    +    @Override
    +    public VisibilityBindingSet deserialize(String topic, byte[] data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        Input input = new Input(new ByteArrayInputStream(data));
    +        return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
    --- End diff --
    
    Remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109487518
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,75 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Kafka topics.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +    private static final Logger log = Logger.getLogger(KafkaResultExporter.class);
    +
    +    /**
    +     * Constructs an instance given a Kafka producer.
    +     * 
    +     * @param producer
    +     *            for sending result set alerts to a broker. (not null)
    +     *            created and configured by {@link KafkaResultExporterFactory}
    --- End diff --
    
    I don't think you should indicate the factory here since technically anything could make the KafkaProducer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by pujav65 <gi...@git.apache.org>.
Github user pujav65 commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    merged


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109477891
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---
    @@ -62,6 +64,50 @@ under the License.
                 	</exclusion>
                 </exclusions>
             </dependency>
    +        
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <exclusions>
    +                <exclusion>
    +                    <artifactId>slf4j-log4j12</artifactId>
    +                    <groupId>org.slf4j</groupId>
    +                </exclusion>
    +            </exclusions>
    +        </dependency>
    +                <dependency>
    +            <groupId>com.esotericsoftware</groupId>
    +            <artifactId>kryo</artifactId>
    +            <version>${kryo.version}</version>
    +        </dependency>
    +        
    +        <!-- Testing dependencies. -->
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    +          <classifier>test</classifier>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <classifier>test</classifier>
    +<!--             <scope>test</scope> -->
    --- End diff --
    
    This comment still applies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92826438
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---
    @@ -56,5 +56,39 @@
                 <groupId>org.apache.fluo</groupId>
                 <artifactId>fluo-api</artifactId>
             </dependency>
    +
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <exclusions>
    +                <exclusion>
    +                    <artifactId>slf4j-log4j12</artifactId>
    +                    <groupId>org.slf4j</groupId>
    +                </exclusion>
    +            </exclusions>
    +        </dependency>
    +        <!-- Testing dependencies. -->
    +        <dependency>
    +            <groupId>org.apache.kafka</groupId>
    +            <artifactId>kafka_2.11</artifactId>
    +            <version>0.10.1.0</version>
    +            <classifier>test</classifier>
    +<!--             <scope>test</scope> -->
    --- End diff --
    
    Remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by amihalik <gi...@git.apache.org>.
Github user amihalik commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    asfbot build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825793
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +
    +    /**
    +     * Constructor
    +     * 
    +     * @param producer
    +     *            created by {@link KafkaResultExporterFactory}
    +     */
    +    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
    +        super();
    +        checkNotNull(producer, "Producer is required.");
    +        this.producer = producer;
    +    }
    +
    +    /**
    +     * Send the results to the topic using the queryID as the topicname
    +     */
    +    @Override
    +    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
    +        checkNotNull(fluoTx);
    +        checkNotNull(queryId);
    +        checkNotNull(result);
    +        try {
    +        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
    +            String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result;
    +            System.out.println(msg);
    +
    +            // Send result on topic
    +            ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<String, VisibilityBindingSet>(/* topicname= */ queryId, /* value= */ result);
    +            // Can add a key if you need to:
    +            // ProducerRecord(String topic, K key, V value)
    +            producer.send(rec);
    +            System.out.println("producer.send(rec) completed");
    --- End diff --
    
    Logger


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by pujav65 <gi...@git.apache.org>.
Github user pujav65 commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    asfbot build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92826060
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporterFactory.java ---
    @@ -0,0 +1,62 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import org.apache.fluo.api.observer.Observer.Context;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +import com.google.common.base.Optional;
    +
    +/**
    + * Creates instances of {@link KafkaResultExporter}.
    + * <p/>
    + * Configure a Kafka producer by adding several required Key/values as described here:
    + * http://kafka.apache.org/documentation.html#producerconfigs
    + * <p/>
    + * Here is a simple example:
    + * <pre>
    + *     Properties producerConfig = new Properties();
    + *     producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    + *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
    + *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    + * </pre>
    + * 
    + * @see ProducerConfig
    + */
    +public class KafkaResultExporterFactory implements IncrementalResultExporterFactory {
    +    @Override
    +    public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException {
    +        final KafkaExportParameters exportParams = new KafkaExportParameters(context.getObserverConfiguration().toMap());
    +        System.out.println("KafkaResultExporterFactory.build(): params.isExportToKafka()=" + exportParams.isExportToKafka());
    --- End diff --
    
    log.debug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92827152
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,112 @@
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    --- End diff --
    
    Missing documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by pujav65 <gi...@git.apache.org>.
Github user pujav65 commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    asfbot build


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109477449
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---
    @@ -132,12 +132,12 @@ public CreatePcj(final int spInsertBatchSize) {
          * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}.
          * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}.
          */
    -	public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
    -			final Connector accumulo, String ryaInstance )
    -					throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException {
    -		requireNonNull(pcjId);
    -		requireNonNull(pcjStorage);
    -		requireNonNull(fluo);
    +    public String  withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo,
    --- End diff --
    
    You need to update the method's documentation since you updated its signature.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825382
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +
    +    /**
    +     * Constructor
    --- End diff --
    
    At least say "Constructs an instance of {@link KafkaResultExporter}". As is, this doc doesn't add anything to the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92827536
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.apache.fluo.api.observer.Observer;
    +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
    +
    +/**
    + * Provides read/write functions to the parameters map that is passed into an
    + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
    + * to PCJ exporting to a kafka topic.
    + * Remember: if doesn't count unless it is added to params
    + */
    +
    +public class KafkaExportParameters extends ParametersBase {
    +
    +    public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled";
    +
    +    public KafkaExportParameters(final Map<String, String> params) {
    +        super(params);
    +    }
    +
    +    /**
    +     * @param isExportToKafka
    +     *            - {@code True} if the Fluo application should export
    +     *            to Kafka; otherwise {@code false}.
    +     */
    +    public void setExportToKafka(final boolean isExportToKafka) {
    +        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
    +    }
    +
    +    /**
    +     * @return {@code True} if the Fluo application should export to Kafka; otherwise
    +     *         {@code false}. Defaults to {@code false} if no value is present.
    +     */
    +    public boolean isExportToKafka() {
    +        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
    +    }
    +
    +    /**
    +     * Add the properties to the params, NOT keeping them separate from the other params.
    +     * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
    +     * 
    +     * @param producerConfig
    +     */
    +    public void setProducerConfig(final Properties producerConfig) {
    +        // params.put(CONF_KAFKA_PRODUCER_PROPS, propertiesIntoXmlString(producerConfig));
    +        for (Object key : producerConfig.keySet().toArray()) {
    +            Object value = producerConfig.getProperty(key.toString());
    +            this.params.put(key.toString(), value.toString());
    +        }
    +    }
    +
    +    /**
    +     * @return all the params (not just kafka producer Configuration) as a {@link Properties}
    +     */
    +    public Properties getProducerConfig() {
    +        //        return propertiesFromXmlString(params.get(CONF_KAFKA_PRODUCER_PROPS));
    --- End diff --
    
    Remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825232
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    --- End diff --
    
    This documentation is inaccurate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92826264
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.junit.Test;
    +
    +/**
    + * Tests the methods of {@link KafkaExportParameters}.
    + */
    +public class KafkaExportParametersTest {
    +
    +    @Test
    +    public void writeParams() {
    +        final Map<String, String> params = new HashMap<>();
    +
    +        // Load some values into the params using the wrapper.
    +        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
    +        kafkaParams.setExportToKafka(true);
    +
    +        // Ensure the params map has the expected values.
    +        final Map<String, String> expectedParams = new HashMap<>();
    +        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true");
    +        assertTrue(kafkaParams.isExportToKafka());
    +        assertEquals(expectedParams, params);
    +
    +        // now go the other way.
    +        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false");
    +        kafkaParams.setExportToKafka(false);
    +        assertFalse(kafkaParams.isExportToKafka());
    +        assertEquals(expectedParams, params);
    +    }
    +    @Test
    +    public void writeParamsProps() {
    +        final String KEY1 = "key1";
    +        final String VALUE1FIRST = "value1-preserve-this";
    +        final String VALUE1SECOND = "value1prop";
    +        final String KEY2 = "\u6b4c\u53e4\u4e8b\u5b66\u9031\u6587\u539f\u554f\u696d\u9593\u9769\u793e\u3002"; // http://generator.lorem-ipsum.info/_chinese
    +        final String VALUE2 = "\u826f\u6cbb\u9bae\u733f\u6027\u793e\u8cbb\u8457\u4f75\u75c5\u6975\u9a13\u3002";
    +
    +        final Map<String, String> params = new HashMap<>();
    +        // Make sure export key1 is kept seperate from producer config key1
    +        params.put(KEY1, VALUE1FIRST);
    +        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
    +        // Load some values into the properties using the wrapper.
    +        Properties props = new Properties();
    +        props.put(KEY1, VALUE1SECOND);
    +        props.put(KEY2, VALUE2);
    +        kafkaParams.setProducerConfig(props);
    +        Properties propsAfter = kafkaParams.getProducerConfig();
    +        assertEquals(props, propsAfter);
    +        assertEquals(params, params);
    +        assertEquals("Should not change identical parameters key", params.get(KEY1), VALUE1FIRST);
    +        assertEquals("Props should not have params's key", propsAfter.get(KEY1), VALUE1SECOND);
    +        assertNull("Should not have props key", params.get(KEY2));
    +    }
    +
    +    @Test
    +    public void notConfigured() {
    +        final Map<String, String> params = new HashMap<>();
    +
    +        // Ensure an unconfigured parameters map will say kafka export is disabled.
    +        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
    +        assertFalse(kafkaParams.isExportToKafka());
    +    }
    +
    +    @Test
    +    public void testKafkaResultExporterFactory() {
    +        KafkaResultExporterFactory factory = new KafkaResultExporterFactory();
    +        assertNotNull(factory);
    +        // KafkaExportParameters params = new KafkaExportParameters(new HashMap<String, String>());
    --- End diff --
    
    Remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #121: Rya-128 trigger service to Kafka

Posted by pujav65 <gi...@git.apache.org>.
Github user pujav65 commented on the issue:

    https://github.com/apache/incubator-rya/pull/121
  
    asfbot built


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92826386
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParametersTest.java ---
    @@ -0,0 +1,99 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +import static org.junit.Assert.assertNotNull;
    +import static org.junit.Assert.assertNull;
    +import static org.junit.Assert.assertTrue;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.junit.Test;
    +
    +/**
    + * Tests the methods of {@link KafkaExportParameters}.
    + */
    +public class KafkaExportParametersTest {
    +
    +    @Test
    +    public void writeParams() {
    +        final Map<String, String> params = new HashMap<>();
    +
    +        // Load some values into the params using the wrapper.
    +        final KafkaExportParameters kafkaParams = new KafkaExportParameters(params);
    +        kafkaParams.setExportToKafka(true);
    +
    +        // Ensure the params map has the expected values.
    +        final Map<String, String> expectedParams = new HashMap<>();
    +        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "true");
    +        assertTrue(kafkaParams.isExportToKafka());
    +        assertEquals(expectedParams, params);
    +
    +        // now go the other way.
    +        expectedParams.put(KafkaExportParameters.CONF_EXPORT_TO_KAFKA, "false");
    +        kafkaParams.setExportToKafka(false);
    +        assertFalse(kafkaParams.isExportToKafka());
    +        assertEquals(expectedParams, params);
    +    }
    +    @Test
    +    public void writeParamsProps() {
    +        final String KEY1 = "key1";
    --- End diff --
    
    All caps is usually reserved for public static final fields of a class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109477804
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/pom.xml ---
    @@ -62,6 +64,50 @@ under the License.
                 	</exclusion>
                 </exclusions>
             </dependency>
    +        
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    --- End diff --
    
    The parent pom needs to have its dependency management section updated to include these jars so that you no longer need version numbers in the child pom file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109488230
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---
    @@ -56,5 +56,39 @@
                 <groupId>org.apache.fluo</groupId>
                 <artifactId>fluo-api</artifactId>
             </dependency>
    +
    +        <dependency>
    +          <groupId>org.apache.kafka</groupId>
    +          <artifactId>kafka-clients</artifactId>
    +          <version>0.10.1.0</version>
    --- End diff --
    
    Shouldn't need versions in this pom file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109478209
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,112 @@
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    --- End diff --
    
    Also need to rename this to "VisibilityBindingSetSerDe" or something. Maybe mention it's for Kafka integration in the name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109479626
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    +    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
    +        @Override
    +        protected Kryo initialValue() {
    +            Kryo kryo = new Kryo();
    +            return kryo;
    +        };
    +    };
    +
    +    @Override
    +    public VisibilityBindingSet deserialize(String topic, byte[] data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        Input input = new Input(new ByteArrayInputStream(data));
    +        return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
    +    }
    +
    +    @Override
    +    public void configure(Map<String, ?> configs, boolean isKey) {
    +        // Do nothing.
    +    }
    +
    +    @Override
    +    public byte[] serialize(String topic, VisibilityBindingSet data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        Output output = new Output(baos);
    +        internalSerializer.write(kryos.get(), output, data);
    +        output.flush();
    +        byte[] array = baos.toByteArray();
    +        return array;
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
    +    }
    +
    +    @Override
    +    public void close() {
    +        // Do nothing.
    +    }
    +
    +    private static Value makeValue(final String valueString, final URI typeURI) {
    +        // Convert the String Value into a Value.
    +        final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
    +        if (typeURI.equals(XMLSchema.ANYURI)) {
    +            return valueFactory.createURI(valueString);
    +        } else {
    +            return valueFactory.createLiteral(valueString, typeURI);
    +        }
    +    }
    +
    +    /**
    +     * De/Serialize a visibility binding set using the Kryo library.
    +     * TODO rename this KryoSomething and change the package.
    --- End diff --
    
    Address the TODO.
    
    I think it should be named KryoVisibilityBindingSetSerializer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109485896
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    --- End diff --
    
    Same comment about packaging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109481110
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    +    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
    +        @Override
    +        protected Kryo initialValue() {
    +            Kryo kryo = new Kryo();
    +            return kryo;
    +        };
    +    };
    +
    +    @Override
    +    public VisibilityBindingSet deserialize(String topic, byte[] data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        Input input = new Input(new ByteArrayInputStream(data));
    +        return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
    +    }
    +
    +    @Override
    +    public void configure(Map<String, ?> configs, boolean isKey) {
    +        // Do nothing.
    +    }
    +
    +    @Override
    +    public byte[] serialize(String topic, VisibilityBindingSet data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        Output output = new Output(baos);
    +        internalSerializer.write(kryos.get(), output, data);
    +        output.flush();
    +        byte[] array = baos.toByteArray();
    +        return array;
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
    +    }
    +
    +    @Override
    +    public void close() {
    +        // Do nothing.
    +    }
    +
    +    private static Value makeValue(final String valueString, final URI typeURI) {
    +        // Convert the String Value into a Value.
    +        final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
    +        if (typeURI.equals(XMLSchema.ANYURI)) {
    +            return valueFactory.createURI(valueString);
    +        } else {
    +            return valueFactory.createLiteral(valueString, typeURI);
    +        }
    +    }
    +
    +    /**
    +     * De/Serialize a visibility binding set using the Kryo library.
    +     * TODO rename this KryoSomething and change the package.
    +     *
    +     */
    +    private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
    +        private static final Logger log = Logger.getLogger(BindingSetSerializer.class);
    +        @Override
    +        public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) {
    +            log.debug("Serializer writing visBindingSet" + visBindingSet);
    +            output.writeString(visBindingSet.getVisibility());
    +            // write the number count for the reader.
    +            output.writeInt(visBindingSet.size());
    +            for (Binding binding : visBindingSet) {
    +                output.writeString(binding.getName());
    +                final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue());
    --- End diff --
    
    Why convert the Value into a RyaType for serialization instead of just using the OpenRDF model that it is already in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109486620
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.apache.fluo.api.observer.Observer;
    +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
    +
    +/**
    + * Provides read/write functions to the parameters map that is passed into an
    + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
    + * to PCJ exporting to a kafka topic.
    + * Remember: if doesn't count unless it is added to params
    + */
    +
    +public class KafkaExportParameters extends ParametersBase {
    +
    +    public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled";
    +
    +    public KafkaExportParameters(final Map<String, String> params) {
    +        super(params);
    +    }
    +
    +    /**
    +     * @param isExportToKafka
    +     *            - {@code True} if the Fluo application should export
    +     *            to Kafka; otherwise {@code false}.
    +     */
    +    public void setExportToKafka(final boolean isExportToKafka) {
    +        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
    +    }
    +
    +    /**
    +     * @return {@code True} if the Fluo application should export to Kafka; otherwise
    +     *         {@code false}. Defaults to {@code false} if no value is present.
    +     */
    +    public boolean isExportToKafka() {
    +        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
    +    }
    +
    +    /**
    +     * Add the properties to the params, NOT keeping them separate from the other params.
    +     * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
    +     * 
    +     * @param producerConfig
    +     */
    +    public void setProducerConfig(final Properties producerConfig) {
    --- End diff --
    
    This method name is kind of misleading because 'set' functions usually mean you're just setting a single value within the object the set function is on.
    
    Maybe rename this to "addProperties(Properties properties)"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by DLotts <gi...@git.apache.org>.
Github user DLotts commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r99259887
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,112 @@
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    --- End diff --
    
    fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109480721
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    +    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
    +        @Override
    +        protected Kryo initialValue() {
    +            Kryo kryo = new Kryo();
    +            return kryo;
    +        };
    +    };
    +
    +    @Override
    +    public VisibilityBindingSet deserialize(String topic, byte[] data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        Input input = new Input(new ByteArrayInputStream(data));
    +        return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
    +    }
    +
    +    @Override
    +    public void configure(Map<String, ?> configs, boolean isKey) {
    +        // Do nothing.
    +    }
    +
    +    @Override
    +    public byte[] serialize(String topic, VisibilityBindingSet data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        Output output = new Output(baos);
    +        internalSerializer.write(kryos.get(), output, data);
    +        output.flush();
    +        byte[] array = baos.toByteArray();
    +        return array;
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
    +    }
    +
    +    @Override
    +    public void close() {
    +        // Do nothing.
    +    }
    +
    +    private static Value makeValue(final String valueString, final URI typeURI) {
    +        // Convert the String Value into a Value.
    +        final ValueFactory valueFactory = ValueFactoryImpl.getInstance();
    +        if (typeURI.equals(XMLSchema.ANYURI)) {
    +            return valueFactory.createURI(valueString);
    +        } else {
    +            return valueFactory.createLiteral(valueString, typeURI);
    +        }
    +    }
    +
    +    /**
    +     * De/Serialize a visibility binding set using the Kryo library.
    +     * TODO rename this KryoSomething and change the package.
    +     *
    +     */
    +    private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
    +        private static final Logger log = Logger.getLogger(BindingSetSerializer.class);
    +        @Override
    +        public void write(Kryo kryo, Output output, VisibilityBindingSet visBindingSet) {
    +            log.debug("Serializer writing visBindingSet" + visBindingSet);
    +            output.writeString(visBindingSet.getVisibility());
    +            // write the number count for the reader.
    +            output.writeInt(visBindingSet.size());
    +            for (Binding binding : visBindingSet) {
    +                output.writeString(binding.getName());
    +                final RyaType ryaValue = RdfToRyaConversions.convertValue(binding.getValue());
    +                final String valueString = ryaValue.getData();
    +                final URI type = ryaValue.getDataType();
    +                output.writeString(valueString);
    +                output.writeString(type.toString());
    +            }
    +        }
    +
    +        @Override
    +        public VisibilityBindingSet read(Kryo kryo, Input input, Class<VisibilityBindingSet> aClass) {
    +            log.debug("Serializer reading visBindingSet");
    +            String visibility = input.readString();
    +            int bindingCount = input.readInt();
    +            ArrayList<String> namesList = new ArrayList<String>(bindingCount);
    --- End diff --
    
    This hunk of code could just be a MapBindingSet that you add each binding to instead of having independent lists for names and values. It would be a lot easier to read.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109478089
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    +    private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
    +        @Override
    +        protected Kryo initialValue() {
    +            Kryo kryo = new Kryo();
    +            return kryo;
    +        };
    +    };
    +
    +    @Override
    +    public VisibilityBindingSet deserialize(String topic, byte[] data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        Input input = new Input(new ByteArrayInputStream(data));
    +        return internalSerializer.read(kryos.get(), input, VisibilityBindingSet.class);
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(new String(data, StandardCharsets.UTF_8), null);
    +    }
    +
    +    @Override
    +    public void configure(Map<String, ?> configs, boolean isKey) {
    +        // Do nothing.
    +    }
    +
    +    @Override
    +    public byte[] serialize(String topic, VisibilityBindingSet data) {
    +        KryoInternalSerializer internalSerializer = new KryoInternalSerializer();
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        Output output = new Output(baos);
    +        internalSerializer.write(kryos.get(), output, data);
    +        output.flush();
    +        byte[] array = baos.toByteArray();
    +        return array;
    +        // this is an alternative, or perhaps replace it:
    +        // return (new VisibilityBindingSetStringConverter()).convert(data, null).getBytes(StandardCharsets.UTF_8);
    --- End diff --
    
    Remove commented out code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109477971
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/BindingSetSerializer.java ---
    @@ -0,0 +1,112 @@
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.util.ArrayList;
    +import java.util.Map;
    +
    +import org.apache.kafka.common.serialization.Deserializer;
    +import org.apache.kafka.common.serialization.Serializer;
    +import org.apache.rya.api.domain.RyaType;
    +import org.apache.rya.api.resolver.RdfToRyaConversions;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.URI;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.ValueFactory;
    +import org.openrdf.model.impl.URIImpl;
    +import org.openrdf.model.impl.ValueFactoryImpl;
    +import org.openrdf.model.vocabulary.XMLSchema;
    +import org.openrdf.query.Binding;
    +import org.openrdf.query.BindingSet;
    +import org.openrdf.query.impl.ListBindingSet;
    +
    +import com.esotericsoftware.kryo.Kryo;
    +import com.esotericsoftware.kryo.io.Input;
    +import com.esotericsoftware.kryo.io.Output;
    +
    +public class BindingSetSerializer implements Serializer<VisibilityBindingSet>, Deserializer<VisibilityBindingSet> {
    --- End diff --
    
    Still missing documentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r109485842
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaExportParameters.java ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import java.util.Map;
    +import java.util.Properties;
    +
    +import org.apache.fluo.api.observer.Observer;
    +import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
    +
    +/**
    + * Provides read/write functions to the parameters map that is passed into an
    + * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
    + * to PCJ exporting to a kafka topic.
    + * Remember: if doesn't count unless it is added to params
    + */
    +
    +public class KafkaExportParameters extends ParametersBase {
    +
    +    public static final String CONF_EXPORT_TO_KAFKA = "pcj.fluo.export.kafka.enabled";
    +
    +    public KafkaExportParameters(final Map<String, String> params) {
    +        super(params);
    +    }
    +
    +    /**
    +     * @param isExportToKafka
    +     *            - {@code True} if the Fluo application should export
    +     *            to Kafka; otherwise {@code false}.
    +     */
    +    public void setExportToKafka(final boolean isExportToKafka) {
    +        setBoolean(params, CONF_EXPORT_TO_KAFKA, isExportToKafka);
    +    }
    +
    +    /**
    +     * @return {@code True} if the Fluo application should export to Kafka; otherwise
    +     *         {@code false}. Defaults to {@code false} if no value is present.
    +     */
    +    public boolean isExportToKafka() {
    +        return getBoolean(params, CONF_EXPORT_TO_KAFKA, false);
    +    }
    +
    +    /**
    +     * Add the properties to the params, NOT keeping them separate from the other params.
    +     * Guaranteed by Properties: Each key and its corresponding value in the property list is a string.
    +     * 
    +     * @param producerConfig
    --- End diff --
    
    Missing docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92826806
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---
    @@ -123,7 +122,7 @@ public CreatePcj(final int spInsertBatchSize) {
          * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}.
          * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}.
          */
    -    public void withRyaIntegration(
    +    public String withRyaIntegration(
    --- End diff --
    
    Update this method's documentation to indicate what it returns.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #121: [WIP] Rya-128 trigger service to Kafka

Posted by kchilton2 <gi...@git.apache.org>.
Github user kchilton2 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/121#discussion_r92825647
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/KafkaResultExporter.java ---
    @@ -0,0 +1,72 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app.export.rya;
    +
    +import static com.google.common.base.Preconditions.checkNotNull;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerRecord;
    +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +
    +/**
    + * Incrementally exports SPARQL query results to Accumulo PCJ tables as they are defined by Rya.
    + */
    +public class KafkaResultExporter implements IncrementalResultExporter {
    +    private final KafkaProducer<String, VisibilityBindingSet> producer;
    +
    +    /**
    +     * Constructor
    +     * 
    +     * @param producer
    +     *            created by {@link KafkaResultExporterFactory}
    +     */
    +    public KafkaResultExporter(KafkaProducer<String, VisibilityBindingSet> producer) {
    +        super();
    +        checkNotNull(producer, "Producer is required.");
    +        this.producer = producer;
    +    }
    +
    +    /**
    +     * Send the results to the topic using the queryID as the topicname
    +     */
    +    @Override
    +    public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException {
    +        checkNotNull(fluoTx);
    +        checkNotNull(queryId);
    +        checkNotNull(result);
    +        try {
    +        final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID);
    --- End diff --
    
    White space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---