You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by zenfenan <gi...@git.apache.org> on 2018/05/19 16:41:04 UTC

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

GitHub user zenfenan opened a pull request:

    https://github.com/apache/nifi/pull/2724

    NIFI-5133: Implemented Google Cloud PubSub Processors

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    ### For all changes:
    - [x] Is there a JIRA ticket associated with this PR? Is it referenced 
         in the commit message?
    
    - [x] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    - [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
    
    - [x] Is your initial contribution a single, squashed commit?
    
    ### For code changes:
    - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
    - [x] Have you written or updated unit tests to verify your changes?
    - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
    - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
    - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
    - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?
    
    ### For documentation related changes:
    - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
    
    ### Note:
    Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zenfenan/nifi NIFI-5133

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2724.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2724
    
----
commit 40d612e3ca01b7b7aa87ac0bbdfcce23552cefcb
Author: zenfenan <si...@...>
Date:   2018-05-12T17:21:23Z

    NIFI-5133: Implemented Google Cloud PubSub Processors

----


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r191079620
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1;
    +
    +        pullRequest = PullRequest.newBuilder()
    +                .setMaxMessages(batchSize)
    +                .setReturnImmediately(false)
    +                .setSubscription(getSubscriptionName(context))
    +                .build();
    +
    +        try {
    +            subscriber = getSubscriber(context);
    +        } catch (IOException e) {
    +            getLogger().error("Failed to create Google Cloud Subscriber due to ", e);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped() {
    +        if (subscriber != null) {
    +            subscriber.shutdown();
    +        }
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                SUBSCRIPTION,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        if (subscriber == null) {
    --- End diff --
    
    It's done.


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/nifi/pull/2724


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189970566
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1;
    +
    +        pullRequest = PullRequest.newBuilder()
    +                .setMaxMessages(batchSize)
    +                .setReturnImmediately(false)
    +                .setSubscription(getSubscriptionName(context))
    +                .build();
    +
    +        try {
    +            subscriber = getSubscriber(context);
    +        } catch (IOException e) {
    +            getLogger().error("Failed to create Google Cloud Subscriber due to ", e);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped() {
    +        if (subscriber != null) {
    +            subscriber.shutdown();
    +        }
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                SUBSCRIPTION,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        if (subscriber == null) {
    --- End diff --
    
    Yep, I'm thinking about an edge case: let's say the user see the bulletin generated by the OnScheduled call, but move to something else for some reasons, then the bulletin won't show up anymore after 5 minutes even though the processor is running just fine. Right?


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189903461
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1;
    --- End diff --
    
    to be changed if my previous comment makes sense


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189963005
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1;
    +
    +        pullRequest = PullRequest.newBuilder()
    +                .setMaxMessages(batchSize)
    +                .setReturnImmediately(false)
    +                .setSubscription(getSubscriptionName(context))
    +                .build();
    +
    +        try {
    +            subscriber = getSubscriber(context);
    +        } catch (IOException e) {
    +            getLogger().error("Failed to create Google Cloud Subscriber due to ", e);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped() {
    +        if (subscriber != null) {
    +            subscriber.shutdown();
    +        }
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                SUBSCRIPTION,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        if (subscriber == null) {
    --- End diff --
    
    Block L102:106 covers that, right? If `getSubscriber()` fails to create the subscriber, it will throw the exception which is logged. I'm fine with adding a log here but wondering whether that block alone is not enough.


---

[GitHub] nifi issue #2724: NIFI-5133: Implemented Google Cloud PubSub Processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2724
  
    Yeah... I have similar throughput on my side. It sounds really low to me but have no similar experience so... If I have time, I'll do a test with Kinesis on AWS side. Anyway, code looks good to me, I played with the processors and tried multiple scenarios. LGTM, merging to master, thanks a lot @zenfenan 


---

[GitHub] nifi issue #2724: NIFI-5133: Implemented Google Cloud PubSub Processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2724
  
    Thanks for the update @zenfenan - I've played a bit with it and have two questions:
    - does it make sense to expose both fetch count and batch size for the publish processor? Do we expect situations where numbers won't be the same?
    - Not a huge user of the GC services but performances seemed really slow to me. Is that expected because I'm just using basic setup / free trial? Or is it just the Pub/Sub mechanism itself? Or is it something else on NiFi's side?


---

[GitHub] nifi issue #2724: NIFI-5133: Implemented Google Cloud PubSub Processors

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on the issue:

    https://github.com/apache/nifi/pull/2724
  
    Out of curiosity, what kind of performance do you get in your testing?


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189965330
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.core.ApiFuture;
    +import com.google.api.gax.batching.BatchingSettings;
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.api.gax.core.InstantiatingExecutorProvider;
    +import com.google.cloud.pubsub.v1.Publisher;
    +import com.google.common.collect.ImmutableList;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.Timestamp;
    +import com.google.pubsub.v1.ProjectTopicName;
    +import com.google.pubsub.v1.PubsubMessage;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
    +
    +@SeeAlso({ConsumeGCPubSub.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google-cloud", "pubsub", "publish"})
    +@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties" +
    +        "to be added. If any such dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
    +@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
    +        description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +@WritesAttributes({
    +        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
    +})
    +public class PublishGCPubSub extends AbstractGCPubSubProcessor{
    +
    +    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-topic")
    +            .displayName("Topic Name")
    +            .description("Name of the Google Cloud PubSub Topic")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor EXECUTOR_COUNT = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-publish-executor-count")
    +            .displayName("Executor Count")
    +            .description("Indicates the number of executors the cloud service should use to publish the messages")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.")
    +            .build();
    +
    +    private Publisher publisher = null;
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                TOPIC_NAME,
    +                EXECUTOR_COUNT,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .displayName(propertyDescriptorName)
    +                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +                .build();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY))
    +        );
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        try {
    +            publisher = getPublisherBuilder(context, flowFile).build();
    +
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFile, baos);
    +            final ByteString flowFileContent = ByteString.copyFromUtf8(baos.toString());
    +
    +            PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent)
    +                    .setPublishTime(Timestamp.newBuilder().build())
    +                    .putAllAttributes(getDynamicAttributesMap(context, flowFile))
    +                    .build();
    +
    +            ApiFuture<String> messageIdFuture = publisher.publish(message);
    +
    +            while (messageIdFuture.isDone()) {
    +                Thread.sleep(1000L);
    +            }
    +
    +            final String messageId = messageIdFuture.get();
    +
    +            attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
    +            attributes.put(TOPIC_NAME_ATTRIBUTE, getTopicName(context, flowFile).toString());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +        } catch (IOException e) {
    +            getLogger().error("Routing to 'failure'. Failed to build the Google Cloud PubSub Publisher due to ", e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        } catch (InterruptedException | ExecutionException e) {
    +            getLogger().error("Routing to 'retry'. Failed to publish the message to Google PubSub topic due to ", e);
    +            session.transfer(flowFile, REL_RETRY);
    +            return;
    +        } finally {
    +            shutdownPublisher();
    --- End diff --
    
    Actually I previously had ExpressionLanguageScope for `TOPIC`  set to `VariableRegistry` and in `unScheduled` shutdown the publisher since that would be more efficient but the GCP SDK kept on writing WARN and ERROR log messages saying "shutdown was not called and for proper resource utilization, call shutdown()" That's why I had to go with this. I'll try to check with Google folks and there is this `TransportChannel` API.. have to check if that offers anything of interest in this regard.


---

[GitHub] nifi issue #2724: NIFI-5133: Implemented Google Cloud PubSub Processors

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on the issue:

    https://github.com/apache/nifi/pull/2724
  
    I get 60 messages published per minute. The size also plays a role. The sample content that I publish is ~5 KB


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189967778
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.auth.oauth2.GoogleCredentials;
    +import com.google.cloud.ServiceOptions;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-publish-batch-size")
    +            .displayName("Batch Size")
    +            .description("Indicates the number of messages the cloud service should bundle together in a batch")
    --- End diff --
    
    This is used for the cloud service. The processor processes only one record at a time. This property indicates whether the the service on the cloud side should bundle messages in a batch and send it to subscribers or publish. And yeah, it is better to add something like "if not set or left empty, only one message will be used in a batch"


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189903290
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java ---
    @@ -0,0 +1,64 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.auth.oauth2.GoogleCredentials;
    +import com.google.cloud.ServiceOptions;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
    +
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashSet;
    +import java.util.Set;
    +
    +public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor {
    +
    +    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-publish-batch-size")
    +            .displayName("Batch Size")
    +            .description("Indicates the number of messages the cloud service should bundle together in a batch")
    --- End diff --
    
    Can we add the expected behavior if not set? Does it mean we are processing messages individually? If yes, could be better to make this property required and validate for a strictly positive integer?


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by zenfenan <gi...@git.apache.org>.
Github user zenfenan commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189971247
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1;
    +
    +        pullRequest = PullRequest.newBuilder()
    +                .setMaxMessages(batchSize)
    +                .setReturnImmediately(false)
    +                .setSubscription(getSubscriptionName(context))
    +                .build();
    +
    +        try {
    +            subscriber = getSubscriber(context);
    +        } catch (IOException e) {
    +            getLogger().error("Failed to create Google Cloud Subscriber due to ", e);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped() {
    +        if (subscriber != null) {
    +            subscriber.shutdown();
    +        }
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                SUBSCRIPTION,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        if (subscriber == null) {
    --- End diff --
    
    Got your point. A log has to be put here. I'll do it.


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189905474
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java ---
    @@ -0,0 +1,240 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.core.ApiFuture;
    +import com.google.api.gax.batching.BatchingSettings;
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.api.gax.core.InstantiatingExecutorProvider;
    +import com.google.cloud.pubsub.v1.Publisher;
    +import com.google.common.collect.ImmutableList;
    +import com.google.protobuf.ByteString;
    +import com.google.protobuf.Timestamp;
    +import com.google.pubsub.v1.ProjectTopicName;
    +import com.google.pubsub.v1.PubsubMessage;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
    +
    +@SeeAlso({ConsumeGCPubSub.class})
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"google", "google-cloud", "pubsub", "publish"})
    +@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub topic. The processor supports dynamic properties" +
    +        "to be added. If any such dynamic properties are present, they will be sent along with the message in the form of 'attributes'.")
    +@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
    +        description = "Attributes to be set for the outgoing Google Cloud PubSub message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +@WritesAttributes({
    +        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
    +})
    +public class PublishGCPubSub extends AbstractGCPubSubProcessor{
    +
    +    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-topic")
    +            .displayName("Topic Name")
    +            .description("Name of the Google Cloud PubSub Topic")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +            .build();
    +
    +    public static final PropertyDescriptor EXECUTOR_COUNT = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-publish-executor-count")
    +            .displayName("Executor Count")
    +            .description("Indicates the number of executors the cloud service should use to publish the messages")
    +            .required(false)
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final Relationship REL_RETRY = new Relationship.Builder()
    +            .name("retry")
    +            .description("FlowFiles are routed to this relationship if the Google Cloud Pub/Sub operation fails but attempting the operation again may succeed.")
    +            .build();
    +
    +    private Publisher publisher = null;
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                TOPIC_NAME,
    +                EXECUTOR_COUNT,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +                .required(false)
    +                .name(propertyDescriptorName)
    +                .displayName(propertyDescriptorName)
    +                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
    +                .dynamic(true)
    +                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
    +                .build();
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.unmodifiableSet(
    +                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY))
    +        );
    +    }
    +
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +
    +        if (flowFile == null) {
    +            context.yield();
    +            return;
    +        }
    +
    +        final long startNanos = System.nanoTime();
    +
    +        final Map<String, String> attributes = new HashMap<>();
    +
    +        try {
    +            publisher = getPublisherBuilder(context, flowFile).build();
    +
    +            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +            session.exportTo(flowFile, baos);
    +            final ByteString flowFileContent = ByteString.copyFromUtf8(baos.toString());
    +
    +            PubsubMessage message = PubsubMessage.newBuilder().setData(flowFileContent)
    +                    .setPublishTime(Timestamp.newBuilder().build())
    +                    .putAllAttributes(getDynamicAttributesMap(context, flowFile))
    +                    .build();
    +
    +            ApiFuture<String> messageIdFuture = publisher.publish(message);
    +
    +            while (messageIdFuture.isDone()) {
    +                Thread.sleep(1000L);
    +            }
    +
    +            final String messageId = messageIdFuture.get();
    +
    +            attributes.put(MESSAGE_ID_ATTRIBUTE, messageId);
    +            attributes.put(TOPIC_NAME_ATTRIBUTE, getTopicName(context, flowFile).toString());
    +
    +            flowFile = session.putAllAttributes(flowFile, attributes);
    +        } catch (IOException e) {
    +            getLogger().error("Routing to 'failure'. Failed to build the Google Cloud PubSub Publisher due to ", e);
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        } catch (InterruptedException | ExecutionException e) {
    +            getLogger().error("Routing to 'retry'. Failed to publish the message to Google PubSub topic due to ", e);
    +            session.transfer(flowFile, REL_RETRY);
    +            return;
    +        } finally {
    +            shutdownPublisher();
    --- End diff --
    
    What's the performance cost to create and shutdown the publisher for each flow file? I understand it's to use flow file attributes but wondering if it's worth the perf cost.


---

[GitHub] nifi pull request #2724: NIFI-5133: Implemented Google Cloud PubSub Processo...

Posted by pvillard31 <gi...@git.apache.org>.
Github user pvillard31 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2724#discussion_r189904067
  
    --- Diff: nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java ---
    @@ -0,0 +1,190 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.gcp.pubsub;
    +
    +import com.google.api.gax.core.FixedCredentialsProvider;
    +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStub;
    +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
    +import com.google.common.collect.ImmutableList;
    +import com.google.pubsub.v1.AcknowledgeRequest;
    +import com.google.pubsub.v1.ProjectSubscriptionName;
    +import com.google.pubsub.v1.PullRequest;
    +import com.google.pubsub.v1.PullResponse;
    +import com.google.pubsub.v1.ReceivedMessage;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.SeeAlso;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.expression.ExpressionLanguageScope;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.StandardValidators;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ACK_ID_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_ATTRIBUTE;
    +import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_SIZE_DESCRIPTION;
    +
    +@SeeAlso({PublishGCPubSub.class})
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"google", "google-cloud", "pubsub", "consume"})
    +@CapabilityDescription("Consumes message from the configured Google Cloud PubSub subscription. If the 'Batch Size' is set, " +
    +        "the configured number of messages will be pulled in a single request, else only one message will be pulled.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute = ACK_ID_ATTRIBUTE, description = ACK_ID_DESCRIPTION),
    +        @WritesAttribute(attribute = SERIALIZED_SIZE_ATTRIBUTE, description = SERIALIZED_SIZE_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
    +        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
    +        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
    +})
    +public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
    +
    +    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
    +            .name("gcp-pubsub-subscription")
    +            .displayName("Subscription")
    +            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
    +            .description("Subscription name of the Google Cloud Pub/Sub")
    +            .required(true)
    +            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
    +            .build();
    +
    +    private SubscriberStub subscriber = null;
    +    private PullRequest pullRequest;
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) {
    +        final Integer batchSize = (context.getProperty(BATCH_SIZE).isSet()) ? context.getProperty(BATCH_SIZE).asInteger() : 1;
    +
    +        pullRequest = PullRequest.newBuilder()
    +                .setMaxMessages(batchSize)
    +                .setReturnImmediately(false)
    +                .setSubscription(getSubscriptionName(context))
    +                .build();
    +
    +        try {
    +            subscriber = getSubscriber(context);
    +        } catch (IOException e) {
    +            getLogger().error("Failed to create Google Cloud Subscriber due to ", e);
    +        }
    +    }
    +
    +    @OnStopped
    +    public void onStopped() {
    +        if (subscriber != null) {
    +            subscriber.shutdown();
    +        }
    +    }
    +
    +    @Override
    +    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return ImmutableList.of(PROJECT_ID,
    +                GCP_CREDENTIALS_PROVIDER_SERVICE,
    +                SUBSCRIPTION,
    +                BATCH_SIZE);
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return Collections.singleton(REL_SUCCESS);
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    +        if (subscriber == null) {
    --- End diff --
    
    if subscriber is null, it means that something went wrong in the @OnScheduled method, right? If so, could be a good idea to log an error here, no? (so that a bulletin is generated to warn users)


---