You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@rya.apache.org by meiercaleb <gi...@git.apache.org> on 2017/08/31 04:19:44 UTC

[GitHub] incubator-rya pull request #220: Rya 319

GitHub user meiercaleb opened a pull request:

    https://github.com/apache/incubator-rya/pull/220

    Rya 319

    <!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at
    
      http://www.apache.org/licenses/LICENSE-2.0
    
    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
    -->
    ## Description
    Refactored PeriodicNotification project to expose certain API components so that the Periodic Query capability could be integrated with the Rya Shell.  In particular, APIs for CreatePeriodicPCJ, DeletePeriodicPCJ, and ListIncrementalQueries were created and integrated with the RyaClient for use in the Rya Shell.
    
    ### Tests
    Mockito tests were written to verify that the shell returns the appropriate String value, and integration tests were written to test all supporting classes.
    
    ### Links
    [Jira](https://issues.apache.org/jira/browse/RYA-319)
    
    ### Checklist
    - [ ] Code Review
    - [X] Squash Commits
    
    #### People To Reivew
    @isper3at @ejwhite922 @amihalik 


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/meiercaleb/incubator-rya RYA-319

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-rya/pull/220.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #220
    
----
commit 1660b850151ea8c7dd4be7b338edfc7865411798
Author: Caleb Meier <ca...@parsons.com>
Date:   2017-08-08T04:22:00Z

    RYA-246-Query-Export-Strategy

commit f63738a63c06c0d5a16b9517aa0694be41db961f
Author: Caleb Meier <ca...@parsons.com>
Date:   2017-08-21T23:41:10Z

    RYA-319-Integration of Periodic Query with CLI

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136431029
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---
    @@ -344,14 +351,18 @@ public String withRyaIntegration(
          * @param ryaInstance - name of Rya instance to connect to
     =======
          * @parlam fluo - A connection to the Fluo application that updates the PCJ index. (not null)
    +<<<<<<< 1660b850151ea8c7dd4be7b338edfc7865411798
     >>>>>>> RYA-246-Query-Export-Strategy
          * @return The Fluo application's Query ID of the query that was created.
    +=======
    +     * @return FluoQuery containing the metadata for the newly registered SPARQL query
    +>>>>>>> RYA-319-Integration of Periodic Query with CLI
    --- End diff --
    
    Merge conflicts


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136431435
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.api;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.fluo.api.client.Snapshot;
    +import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
    +import org.apache.rya.api.client.CreatePCJ.QueryType;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
    +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
    +import org.openrdf.query.parser.ParsedQuery;
    +import org.openrdf.query.parser.sparql.SPARQLParser;
    +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
    +
    +import com.google.common.base.Preconditions;
    +
    +/**
    + * Class for retrieving a List containing a String representation of each query maintained by Fluo.
    + *
    + */
    +public class ListFluoQueries {
    +
    +    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
    +
    +    /**
    +     * Retrieve a list of String representations of each query maintained by Fluo
    +     * 
    +     * @param fluo - FluoClient for interacting with Fluo
    +     * @return - List of String representations of queries maintained by Fluo.
    +     * @throws Exception 
    +     */
    +    public List<String> listFluoQueries(FluoClient fluo) throws Exception {
    +
    +        List<String> queryStrings = new ArrayList<>();
    +        Snapshot sx = fluo.newSnapshot();
    +
    +        List<String> ids = new ListQueryIds().listQueryIds(fluo);
    +        for (String id : ids) {
    +            queryStrings.add(extractString(dao.readQueryMetadata(sx, id)));
    +        }
    +
    +        return queryStrings;
    +    }
    +
    +    private String extractString(QueryMetadata metadata) throws Exception {
    --- End diff --
    
    static


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #220: Rya 319

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/220
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/465/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136567933
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java ---
    @@ -44,7 +44,7 @@
          * Application.
          *
          */
    -    public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
    +    public static enum ExportStrategy{RYA_EXPORT, KAFKA_EXPORT, NO_OP_EXPORT, PERIODIC_EXPORT};
    --- End diff --
    
    I feel like the `_EXPORT` suffix is a bit redundant given that the enum is `ExportStrategy`.  Also, the `NO_OP_EXPORT` could probably just be renamed to `NONE`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136645046
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * This class creates new PeriodicPCJ for a given Rya instance.  
    + */
    +public interface CreatePeriodicPCJ {
    +
    +    /**
    +     * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
    +     * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
    +     * bootStrapServers are the IP for the KafkaBrokers.
    +     * 
    +     * @param instanceName - Rya instance to connect to
    +     * @param sparql - SPARQL query registered with the Periodic Service
    +     * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration witht the
    --- End diff --
    
    Still seeing this on github.  did it get fixed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-rya/pull/220


---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136615988
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---
    @@ -293,18 +296,22 @@ public String withRyaIntegration(
          * @param sparql - sparql query that will registered with Fluo. (not null)
          * @param strategies - ExportStrategies used to specify how final results will be handled
          * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
    +<<<<<<< 1660b850151ea8c7dd4be7b338edfc7865411798
     <<<<<<< d47190b1ab429f1a200ac0d9a0ae07b451db5027
          * @param accumulo - Accumulo connector for connecting with Accumulo
          * @param ryaInstance - name of Rya instance to connect to
     =======
     >>>>>>> RYA-246-Query-Export-Strategy
          * @return The Fluo application's Query ID of the query that was created.
    +=======
    +     * @return FluoQuery containing the metadata for the newly registered SPARQL query
    +>>>>>>> RYA-319-Integration of Periodic Query with CLI
    --- End diff --
    
    Oof! Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136571851
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.List;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.ListIncrementalQueries;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.base.Optional;
    +
    +public class AccumuloListIncrementalQueries extends AccumuloCommand implements ListIncrementalQueries {
    --- End diff --
    
    Did you want to add: @DefaultAnnotation(NonNull.class)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136632758
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136430407
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
    +    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +
    +    /**
    +     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +
    +    @Override
    +    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(pcjId);
    +
    +        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = originalDetails.isPresent();
    +        if(!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
    +        if(!pcjIndexingEnabled) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If the PCJ was being maintained by a Fluo application, then stop that process.
    +        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
    +            try {
    +                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
    +            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
    +                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
    +            }
    +        } else {
    +            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
    +                    + "missing for the Rya instance named '%s'.", instanceName));
    +        }
    +        
    +    }
    +
    +
    +    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
    +        requireNonNull(fluoAppName);
    +        requireNonNull(pcjId);
    +
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
    +                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
    +            // Delete the PCJ from the Fluo App.
    +            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
    +            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
    +        }
    +    }
    +    
    +    
    +    private PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
    --- End diff --
    
    static


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136646583
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java ---
    @@ -0,0 +1,214 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.api;
    +
    +import java.util.Optional;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery;
    +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
    +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.PeriodicNotification;
    +import org.openrdf.query.MalformedQueryException;
    +import org.openrdf.query.algebra.evaluation.function.Function;
    +
    +import com.google.common.collect.Sets;
    +
    +
    +/**
    + * Object that creates a Periodic Query.  A Periodic Query is any query
    + * requesting periodic updates about events that occurred within a given
    + * window of time of this instant. This is also known as a rolling window
    + * query.  Period Queries can be expressed using SPARQL by including the
    + * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI}
    + * in the query.  The user must provide this Function with the following arguments:
    + * the temporal variable in the query that will be filtered on, the window of time
    + * that events must occur within, the period at which the user wants to receive updates,
    + * and the time unit.  The following query requests all observations that occurred
    + * within the last minute and requests updates every 15 seconds.  It also performs
    + * a count on those observations.
    + * <li>
    + * <li> prefix function: http://org.apache.rya/function#
    + * <li>               "prefix time: http://www.w3.org/2006/time# 
    + * <li>               "select (count(?obs) as ?total) where {
    + * <li>               "Filter(function:periodic(?time, 1, .25, time:minutes))
    + * <li>               "?obs uri:hasTime ?time.
    + * <li>               "?obs uri:hasId ?id }
    + * <li>
    --- End diff --
    
    Remove the `<li>` tags and replace with
    ```
    <pre>
    your preformatted content
    </pre>
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136430440
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
    +    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +
    +    /**
    +     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +
    +    @Override
    +    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(pcjId);
    +
    +        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = originalDetails.isPresent();
    +        if(!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
    +        if(!pcjIndexingEnabled) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If the PCJ was being maintained by a Fluo application, then stop that process.
    +        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
    +            try {
    +                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
    +            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
    +                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
    +            }
    +        } else {
    +            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
    +                    + "missing for the Rya instance named '%s'.", instanceName));
    +        }
    +        
    +    }
    +
    +
    +    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
    +        requireNonNull(fluoAppName);
    +        requireNonNull(pcjId);
    +
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
    +                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
    +            // Delete the PCJ from the Fluo App.
    +            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
    +            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
    +        }
    +    }
    +    
    +    
    +    private PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
    +        return new KafkaNotificationRegistrationClient(topic, getProducer(brokers));
    +    }
    +
    +    private KafkaProducer<String, CommandNotification> getProducer(String brokers) {
    --- End diff --
    
    static


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136616422
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---
    @@ -39,7 +39,11 @@
         public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
         public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
         
    +    //binding name reserved for periodic bin id for periodic query results
         public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
    +    
    +    //constant used for storing Kafka connection information in Fluo
    +    public static final String KAFKA = "KAFKA";
    --- End diff --
    
    Realized I wasn't actually using this.  Deleted.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136625082
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.List;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.ListIncrementalQueries;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.base.Optional;
    +
    +public class AccumuloListIncrementalQueries extends AccumuloCommand implements ListIncrementalQueries {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136569934
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java ---
    @@ -44,7 +44,7 @@
          * Application.
          *
          */
    -    public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
    +    public static enum ExportStrategy{RYA_EXPORT, KAFKA_EXPORT, NO_OP_EXPORT, PERIODIC_EXPORT};
    --- End diff --
    
    Hmm, since you can have multiple export strategies, I don't see why we can't just remove the `NO_OP_EXPORT` enum and the `NoOpExporter` class entirely.  If you get an empty set of strategies, that means you never have to obtain an exporter that doesn't do anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136632517
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java ---
    @@ -0,0 +1,141 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.CreatePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PcjException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.openrdf.query.QueryEvaluationException;
    +import org.openrdf.repository.RepositoryException;
    +import org.openrdf.sail.SailException;
    +
    +import com.google.common.base.Optional;
    +
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136430958
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---
    @@ -293,18 +296,22 @@ public String withRyaIntegration(
          * @param sparql - sparql query that will registered with Fluo. (not null)
          * @param strategies - ExportStrategies used to specify how final results will be handled
          * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null)
    +<<<<<<< 1660b850151ea8c7dd4be7b338edfc7865411798
     <<<<<<< d47190b1ab429f1a200ac0d9a0ae07b451db5027
          * @param accumulo - Accumulo connector for connecting with Accumulo
          * @param ryaInstance - name of Rya instance to connect to
     =======
     >>>>>>> RYA-246-Query-Export-Strategy
          * @return The Fluo application's Query ID of the query that was created.
    +=======
    +     * @return FluoQuery containing the metadata for the newly registered SPARQL query
    +>>>>>>> RYA-319-Integration of Periodic Query with CLI
    --- End diff --
    
    Fix merge conflicts in javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136359005
  
    --- Diff: extras/rya.periodic.service/periodic.service.api/pom.xml ---
    @@ -0,0 +1,51 @@
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +  <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
    +     contributor license agreements. See the NOTICE file distributed with this 
    +     work for additional information regarding copyright ownership. The ASF licenses  
    +     this file to you under the Apache License, Version 2.0 (the "License"); you 
    +     may not use this file except in compliance with the License. You may obtain 
    +     a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
    +     required by applicable law or agreed to in writing, software distributed 
    +     under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
    +     OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
    +     the specific language governing permissions and limitations under the License. -->
    +  
    +  <modelVersion>4.0.0</modelVersion>
    +  <parent>
    +    <groupId>org.apache.rya</groupId>
    +    <artifactId>rya.periodic.service</artifactId>
    +    <version>3.2.11-incubating-SNAPSHOT</version>
    +  </parent>
    +  
    +  <artifactId>rya.periodic.service.api</artifactId>
    +  
    +  <name>Apache Rya Periodic Service API</name>
    +  <description>API for Periodic Service Application</description>
    +  
    +  	<dependencies>
    +
    +		<dependency>
    +			<groupId>com.google.code.gson</groupId>
    +			<artifactId>gson</artifactId>
    +			<version>2.8.0</version>
    +			<scope>compile</scope>
    +		</dependency>
    +		<dependency>
    +			<groupId>junit</groupId>
    +			<artifactId>junit</artifactId>
    --- End diff --
    
    Should be test scope.  Also, this file has tab characters in it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #220: Rya 319

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/220
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/470/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136601325
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * This class creates new PeriodicPCJ for a given Rya instance.  
    + */
    +public interface CreatePeriodicPCJ {
    +
    +    /**
    +     * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
    +     * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
    +     * bootStrapServers are the IP for the KafkaBrokers.
    +     * 
    +     * @param instanceName - Rya instance to connect to
    +     * @param sparql - SPARQL query registered with the Periodic Service
    +     * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration witht the
    --- End diff --
    
    nit: typo witht


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by jdasch <gi...@git.apache.org>.
Github user jdasch commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136571283
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
    +    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +
    +    /**
    +     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +
    +    @Override
    +    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(pcjId);
    +
    +        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = originalDetails.isPresent();
    +        if(!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
    +        if(!pcjIndexingEnabled) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If the PCJ was being maintained by a Fluo application, then stop that process.
    +        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
    +            try {
    +                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
    +            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
    +                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
    +            }
    +        } else {
    +            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
    +                    + "missing for the Rya instance named '%s'.", instanceName));
    +        }
    +        
    +    }
    +
    +
    +    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
    +        requireNonNull(fluoAppName);
    +        requireNonNull(pcjId);
    +
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
    +                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
    +            // Delete the PCJ from the Fluo App.
    +            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
    +            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
    +        }
    +    }
    +    
    +    
    +    private PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
    +        return new KafkaNotificationRegistrationClient(topic, getProducer(brokers));
    +    }
    +
    +    private KafkaProducer<String, CommandNotification> getProducer(String brokers) {
    --- End diff --
    
    Do you guys have an opinion on if this should be called createProducer instead of getProducer since it makes a new object rather than accessing an existing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136429388
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * Verifies that Rya instance has Fluo application enabled and lists
    + * all SPARQL queries maintained by the applcation.
    + */
    +public interface ListIncrementalQueries {
    +
    +    /**
    +     * Lists all SPARQL queries maintained by the Fluo Application for a given rya instance and associated information,
    +     * including the Fluo Query Id, the QueryType, the ExportStrategy, and the pretty-printed SPARQL query.
    +     * 
    +     * @param ryaInstance - Rya instance whose queries are incrementally maintained by Fluo
    +     * @return
    --- End diff --
    
    What's this return?  Seems like it might be a comma-separated (or semi-colon) string of the queries.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136616190
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java ---
    @@ -0,0 +1,149 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.api;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.fluo.api.client.Snapshot;
    +import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
    +import org.apache.rya.api.client.CreatePCJ.QueryType;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
    +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
    +import org.openrdf.query.parser.ParsedQuery;
    +import org.openrdf.query.parser.sparql.SPARQLParser;
    +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer;
    +
    +import com.google.common.base.Preconditions;
    +
    +/**
    + * Class for retrieving a List containing a String representation of each query maintained by Fluo.
    + *
    + */
    +public class ListFluoQueries {
    +
    +    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
    +
    +    /**
    +     * Retrieve a list of String representations of each query maintained by Fluo
    +     * 
    +     * @param fluo - FluoClient for interacting with Fluo
    +     * @return - List of String representations of queries maintained by Fluo.
    +     * @throws Exception 
    +     */
    +    public List<String> listFluoQueries(FluoClient fluo) throws Exception {
    +
    +        List<String> queryStrings = new ArrayList<>();
    +        Snapshot sx = fluo.newSnapshot();
    +
    +        List<String> ids = new ListQueryIds().listQueryIds(fluo);
    +        for (String id : ids) {
    +            queryStrings.add(extractString(dao.readQueryMetadata(sx, id)));
    +        }
    +
    +        return queryStrings;
    +    }
    +
    +    private String extractString(QueryMetadata metadata) throws Exception {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136585280
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
    +    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +
    +    /**
    +     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +
    +    @Override
    +    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(pcjId);
    +
    +        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = originalDetails.isPresent();
    +        if(!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
    +        if(!pcjIndexingEnabled) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If the PCJ was being maintained by a Fluo application, then stop that process.
    +        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
    +            try {
    +                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
    +            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
    +                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
    +            }
    +        } else {
    +            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
    +                    + "missing for the Rya instance named '%s'.", instanceName));
    +        }
    +        
    +    }
    +
    +
    +    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
    +        requireNonNull(fluoAppName);
    +        requireNonNull(pcjId);
    +
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
    +                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
    +            // Delete the PCJ from the Fluo App.
    +            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
    +            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
    +        }
    +    }
    +    
    +    
    +    private PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
    +        return new KafkaNotificationRegistrationClient(topic, getProducer(brokers));
    +    }
    +
    +    private KafkaProducer<String, CommandNotification> getProducer(String brokers) {
    --- End diff --
    
    Yea, it should probably be renamed to createProducer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136616038
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---
    @@ -344,14 +351,18 @@ public String withRyaIntegration(
          * @param ryaInstance - name of Rya instance to connect to
     =======
          * @parlam fluo - A connection to the Fluo application that updates the PCJ index. (not null)
    +<<<<<<< 1660b850151ea8c7dd4be7b338edfc7865411798
     >>>>>>> RYA-246-Query-Export-Strategy
          * @return The Fluo application's Query ID of the query that was created.
    +=======
    +     * @return FluoQuery containing the metadata for the newly registered SPARQL query
    +>>>>>>> RYA-319-Integration of Periodic Query with CLI
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #220: Rya 319

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/220
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/466/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136602156
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java ---
    @@ -0,0 +1,141 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.CreatePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PcjException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.openrdf.query.QueryEvaluationException;
    +import org.openrdf.repository.RepositoryException;
    +import org.openrdf.sail.SailException;
    +
    +import com.google.common.base.Optional;
    +
    --- End diff --
    
    class docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136430191
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java ---
    @@ -0,0 +1,141 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.CreatePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PcjException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.openrdf.query.QueryEvaluationException;
    +import org.openrdf.repository.RepositoryException;
    +import org.openrdf.sail.SailException;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloCreatePeriodicPCJ extends AccumuloCommand implements CreatePeriodicPCJ {
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +    
    +    /**
    +     * Constructs an instance of {@link AccumuloCreatePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloCreatePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +    
    +    @Override
    +    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(sparql);
    +
    +        final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
    +        if (!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails();
    +        final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
    +        if (!pcjIndexingEnabeld) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If a Fluo application is being used, task it with updating the PCJ.
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
    +            try {
    +                return updateFluoAppAndRegisterWithKafka(instanceName, fluoAppName, sparql, periodicTopic, bootStrapServers);
    +            } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException
    +                    | RyaDAOException | PeriodicQueryCreationException e) {
    +                throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
    +            } catch (UnsupportedQueryException e) {
    +                throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node "
    +                        + "or an invalid ExportStrategy for the given QueryType.  Projection queries can be exported to either Rya or Kafka,"
    +                        + "unless they contain an aggregation, in which case they can only be exported to Kafka.  Construct queries can be exported"
    +                        + "to Rya and Kafka, and Periodic queries can only be exported to Rya.");
    +            } 
    +        } else {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +    }
    +
    +
    +    
    +    
    +    private String updateFluoAppAndRegisterWithKafka(final String ryaInstance, final String fluoAppName, String sparql, String periodicTopic, String bootStrapServers) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, PeriodicQueryCreationException {
    +        requireNonNull(sparql);
    +        requireNonNull(periodicTopic);
    +        requireNonNull(bootStrapServers);
    +
    +        final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +        
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try(final FluoClient fluoClient = new FluoClientFactory().connect(
    +                cd.getUsername(),
    +                new String(cd.getPassword()),
    +                cd.getInstanceName(),
    +                cd.getZookeepers(),
    +                fluoAppName);) {
    +            // Initialize the PCJ within the Fluo application.
    +            final CreatePeriodicQuery periodicPcj = new CreatePeriodicQuery(fluoClient, periodicStorage);
    +            PeriodicNotificationClient periodicClient = new KafkaNotificationRegistrationClient(periodicTopic, getProducer(bootStrapServers));
    +            return periodicPcj.withRyaIntegration(sparql, periodicClient, getConnector(), ryaInstance).getQueryId();
    +        }
    +    }
    +    
    +    
    +    private KafkaProducer<String, CommandNotification> getProducer(String bootStrapServers) {
    --- End diff --
    
    Make this method static


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136615096
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java ---
    @@ -0,0 +1,141 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.CreatePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.api.persist.RyaDAOException;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PcjException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.openrdf.query.QueryEvaluationException;
    +import org.openrdf.repository.RepositoryException;
    +import org.openrdf.sail.SailException;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloCreatePeriodicPCJ extends AccumuloCommand implements CreatePeriodicPCJ {
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +    
    +    /**
    +     * Constructs an instance of {@link AccumuloCreatePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloCreatePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +    
    +    @Override
    +    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(sparql);
    +
    +        final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = ryaDetailsHolder.isPresent();
    +        if (!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails();
    +        final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled();
    +        if (!pcjIndexingEnabeld) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If a Fluo application is being used, task it with updating the PCJ.
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName();
    +            try {
    +                return updateFluoAppAndRegisterWithKafka(instanceName, fluoAppName, sparql, periodicTopic, bootStrapServers);
    +            } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException
    +                    | RyaDAOException | PeriodicQueryCreationException e) {
    +                throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e);
    +            } catch (UnsupportedQueryException e) {
    +                throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node "
    +                        + "or an invalid ExportStrategy for the given QueryType.  Projection queries can be exported to either Rya or Kafka,"
    +                        + "unless they contain an aggregation, in which case they can only be exported to Kafka.  Construct queries can be exported"
    +                        + "to Rya and Kafka, and Periodic queries can only be exported to Rya.");
    +            } 
    +        } else {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +    }
    +
    +
    +    
    +    
    +    private String updateFluoAppAndRegisterWithKafka(final String ryaInstance, final String fluoAppName, String sparql, String periodicTopic, String bootStrapServers) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, PeriodicQueryCreationException {
    +        requireNonNull(sparql);
    +        requireNonNull(periodicTopic);
    +        requireNonNull(bootStrapServers);
    +
    +        final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +        
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try(final FluoClient fluoClient = new FluoClientFactory().connect(
    +                cd.getUsername(),
    +                new String(cd.getPassword()),
    +                cd.getInstanceName(),
    +                cd.getZookeepers(),
    +                fluoAppName);) {
    +            // Initialize the PCJ within the Fluo application.
    +            final CreatePeriodicQuery periodicPcj = new CreatePeriodicQuery(fluoClient, periodicStorage);
    +            PeriodicNotificationClient periodicClient = new KafkaNotificationRegistrationClient(periodicTopic, getProducer(bootStrapServers));
    +            return periodicPcj.withRyaIntegration(sparql, periodicClient, getConnector(), ryaInstance).getQueryId();
    +        }
    +    }
    +    
    +    
    +    private KafkaProducer<String, CommandNotification> getProducer(String bootStrapServers) {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136632166
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * This class creates new PeriodicPCJ for a given Rya instance.  
    + */
    +public interface CreatePeriodicPCJ {
    +
    +    /**
    +     * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
    +     * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
    +     * bootStrapServers are the IP for the KafkaBrokers.
    +     * 
    +     * @param instanceName - Rya instance to connect to
    +     * @param sparql - SPARQL query registered with the Periodic Service
    +     * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration witht the
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136614310
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * This class creates new PeriodicPCJ for a given Rya instance.  
    + */
    +public interface CreatePeriodicPCJ {
    +
    +    /**
    +     * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
    +     * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
    +     * bootStrapServers are the IP for the KafkaBrokers.
    +     * 
    +     * @param instanceName - Rya instance to connect to
    +     * @param sparql - SPARQL query registered with the Periodic Service
    +     * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration witht the
    +     *            PeriodicService
    +     * @param bootStrapServers - Connection string for Kafka brokers
    +     */
    +    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException;
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136614888
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * Verifies that Rya instance has Fluo application enabled and lists
    + * all SPARQL queries maintained by the applcation.
    + */
    +public interface ListIncrementalQueries {
    +
    +    /**
    +     * Lists all SPARQL queries maintained by the Fluo Application for a given rya instance and associated information,
    +     * including the Fluo Query Id, the QueryType, the ExportStrategy, and the pretty-printed SPARQL query.
    +     * 
    +     * @param ryaInstance - Rya instance whose queries are incrementally maintained by Fluo
    +     * @return
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136431560
  
    --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---
    @@ -39,7 +39,11 @@
         public static final String CONSTRUCT_PREFIX = "CONSTRUCT";
         public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY";
         
    +    //binding name reserved for periodic bin id for periodic query results
         public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId;
    +    
    +    //constant used for storing Kafka connection information in Fluo
    +    public static final String KAFKA = "KAFKA";
    --- End diff --
    
    Might as well turn those comments into javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #220: Rya 319

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/220
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/469/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136615252
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
    +    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +
    +    /**
    +     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +
    +    @Override
    +    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(pcjId);
    +
    +        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = originalDetails.isPresent();
    +        if(!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
    +        if(!pcjIndexingEnabled) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If the PCJ was being maintained by a Fluo application, then stop that process.
    +        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
    +            try {
    +                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
    +            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
    +                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
    +            }
    +        } else {
    +            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
    +                    + "missing for the Rya instance named '%s'.", instanceName));
    +        }
    +        
    +    }
    +
    +
    +    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
    +        requireNonNull(fluoAppName);
    +        requireNonNull(pcjId);
    +
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
    +                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
    +            // Delete the PCJ from the Fluo App.
    +            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
    +            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
    +        }
    +    }
    +    
    +    
    +    private PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136624566
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java ---
    @@ -44,7 +44,7 @@
          * Application.
          *
          */
    -    public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT};
    +    public static enum ExportStrategy{RYA_EXPORT, KAFKA_EXPORT, NO_OP_EXPORT, PERIODIC_EXPORT};
    --- End diff --
    
    Done.  NO_OP_EXPORT deleted from existence.  I will sincerely miss all of the super important no-opting that it was doing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136620175
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java ---
    @@ -0,0 +1,97 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.List;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.ListIncrementalQueries;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.base.Optional;
    +
    +public class AccumuloListIncrementalQueries extends AccumuloCommand implements ListIncrementalQueries {
    --- End diff --
    
    and more docs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by ejwhite922 <gi...@git.apache.org>.
Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136429051
  
    --- Diff: common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client;
    +
    +/**
    + * This class creates new PeriodicPCJ for a given Rya instance.  
    + */
    +public interface CreatePeriodicPCJ {
    +
    +    /**
    +     * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for
    +     * registering new PeriodiNotifications with the underlying notification registration service. Typically, the
    +     * bootStrapServers are the IP for the KafkaBrokers.
    +     * 
    +     * @param instanceName - Rya instance to connect to
    +     * @param sparql - SPARQL query registered with the Periodic Service
    +     * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration witht the
    +     *            PeriodicService
    +     * @param bootStrapServers - Connection string for Kafka brokers
    +     */
    +    public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException;
    --- End diff --
    
    Add @return to the javadocs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya issue #220: Rya 319

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit commented on the issue:

    https://github.com/apache/incubator-rya/pull/220
  
    
    Refer to this link for build results (access rights to CI server needed): 
    https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/468/



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by meiercaleb <gi...@git.apache.org>.
Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136615301
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ {
    +    private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class);
    +
    +    private final GetInstanceDetails getInstanceDetails;
    +
    +    /**
    +     * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}.
    +     *
    +     * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null)
    +     * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null)
    +     */
    +    public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) {
    +        super(connectionDetails, connector);
    +        getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector);
    +    }
    +
    +    @Override
    +    public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException {
    +        requireNonNull(instanceName);
    +        requireNonNull(pcjId);
    +
    +        final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName);
    +        final boolean ryaInstanceExists = originalDetails.isPresent();
    +        if(!ryaInstanceExists) {
    +            throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName));
    +        }
    +
    +        final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled();
    +        if(!pcjIndexingEnabled) {
    +            throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName));
    +        }
    +
    +        // If the PCJ was being maintained by a Fluo application, then stop that process.
    +        final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails();
    +        final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails();
    +
    +        if (fluoDetailsHolder.isPresent()) {
    +            final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName();
    +            try {
    +                stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers);
    +            } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) {
    +                throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e);
    +            }
    +        } else {
    +            log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are "
    +                    + "missing for the Rya instance named '%s'.", instanceName));
    +        }
    +        
    +    }
    +
    +
    +    private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException {
    +        requireNonNull(fluoAppName);
    +        requireNonNull(pcjId);
    +
    +        // Connect to the Fluo application that is updating this instance's PCJs.
    +        final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails();
    +        try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()),
    +                cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) {
    +            // Delete the PCJ from the Fluo App.
    +            PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance);
    +            DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic);
    +            deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers));
    +        }
    +    }
    +    
    +    
    +    private PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException {
    +        return new KafkaNotificationRegistrationClient(topic, getProducer(brokers));
    +    }
    +
    +    private KafkaProducer<String, CommandNotification> getProducer(String brokers) {
    --- End diff --
    
    Renamed.  Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-rya pull request #220: Rya 319

Posted by isper3at <gi...@git.apache.org>.
Github user isper3at commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/220#discussion_r136619564
  
    --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---
    @@ -0,0 +1,131 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.api.client.accumulo;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.Properties;
    +
    +import org.apache.accumulo.core.client.Connector;
    +import org.apache.fluo.api.client.FluoClient;
    +import org.apache.kafka.clients.producer.KafkaProducer;
    +import org.apache.kafka.clients.producer.ProducerConfig;
    +import org.apache.kafka.common.serialization.StringSerializer;
    +import org.apache.rya.api.client.DeletePeriodicPCJ;
    +import org.apache.rya.api.client.GetInstanceDetails;
    +import org.apache.rya.api.client.InstanceDoesNotExistException;
    +import org.apache.rya.api.client.RyaClientException;
    +import org.apache.rya.api.instance.RyaDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
    +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery;
    +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException;
    +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
    +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
    +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
    +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
    +import org.apache.rya.periodic.notification.notification.CommandNotification;
    +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
    +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
    +import org.openrdf.query.MalformedQueryException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Optional;
    +
    --- End diff --
    
    more docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---