You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2021/11/08 14:44:06 UTC

[nifi] branch main updated: NIFI-9304 - Adding PublishGCPubSubLite and ConsumeGCPubSubLite processors

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 37c0527  NIFI-9304 - Adding PublishGCPubSubLite and ConsumeGCPubSubLite processors
37c0527 is described below

commit 37c0527a72971051fab48224c7e8800e41e0bb62
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Sat Oct 16 14:33:05 2021 +0200

    NIFI-9304 - Adding PublishGCPubSubLite and ConsumeGCPubSubLite processors
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #5460.
---
 .../nifi-gcp-bundle/nifi-gcp-processors/pom.xml    |  10 +-
 .../processors/gcp/pubsub/PubSubAttributes.java    |  92 +++---
 .../gcp/pubsub/lite/ConsumeGCPubSubLite.java       | 289 ++++++++++++++++++
 .../gcp/pubsub/lite/PublishGCPubSubLite.java       | 325 +++++++++++++++++++++
 .../services/org.apache.nifi.processor.Processor   |   2 +
 .../nifi-gcp-bundle/nifi-gcp-services-api/pom.xml  |   1 +
 nifi-nar-bundles/nifi-gcp-bundle/pom.xml           |   2 +-
 7 files changed, 675 insertions(+), 46 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
index e93e210..d1ea44d 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/pom.xml
@@ -64,12 +64,12 @@
         <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-mock</artifactId>
-            <version>1.16.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>com.google.cloud</groupId>
             <artifactId>google-cloud-core</artifactId>
+            <version>2.1.7</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>
@@ -90,6 +90,11 @@
             <artifactId>google-cloud-pubsub</artifactId>
         </dependency>
         <dependency>
+            <groupId>com.google.cloud</groupId>
+            <artifactId>google-cloud-pubsublite</artifactId>
+            <version>1.3.0</version>
+        </dependency>
+        <dependency>
             <groupId>com.tdunning</groupId>
             <artifactId>json</artifactId>
             <version>1.8</version>
@@ -109,7 +114,6 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
-            <version>${jackson.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
@@ -152,4 +156,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
index 6aaf04a..cfcdda5 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PubSubAttributes.java
@@ -1,42 +1,50 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-public class PubSubAttributes {
-
-    public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
-    public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic";
-
-    public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
-    public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to";
-
-    public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId";
-    public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message";
-
-    public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize";
-    public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message";
-
-    public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount";
-    public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any";
-
-    public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime";
-    public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published";
-
-    public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes";
-    public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " +
-            "if the original Google Cloud Publisher client added any attributes to the message while sending";
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+public class PubSubAttributes {
+
+    public static final String MESSAGE_ID_ATTRIBUTE = "gcp.pubsub.messageId";
+    public static final String MESSAGE_ID_DESCRIPTION = "ID of the pubsub message published to the configured Google Cloud PubSub topic";
+
+    public static final String TOPIC_NAME_ATTRIBUTE = "gcp.pubsub.topic";
+    public static final String TOPIC_NAME_DESCRIPTION = "Name of the Google Cloud PubSub topic the message was published to";
+
+    public static final String ACK_ID_ATTRIBUTE = "gcp.pubsub.ackId";
+    public static final String ACK_ID_DESCRIPTION = "Acknowledgement Id of the consumed Google Cloud PubSub message";
+
+    public static final String SERIALIZED_SIZE_ATTRIBUTE = "gcp.pubsub.messageSize";
+    public static final String SERIALIZED_SIZE_DESCRIPTION = "Serialized size of the consumed Google Cloud PubSub message";
+
+    public static final String MSG_ATTRIBUTES_COUNT_ATTRIBUTE = "gcp.pubsub.attributesCount";
+    public static final String MSG_ATTRIBUTES_COUNT_DESCRIPTION = "Number of attributes the consumed PubSub message has, if any";
+
+    public static final String MSG_PUBLISH_TIME_ATTRIBUTE = "gcp.pubsub.publishTime";
+    public static final String MSG_PUBLISH_TIME_DESCRIPTION = "Timestamp value when the message was published";
+
+    public static final String DYNAMIC_ATTRIBUTES_ATTRIBUTE = "Dynamic Attributes";
+    public static final String DYNAMIC_ATTRIBUTES_DESCRIPTION = "Other than the listed attributes, this processor may write zero or more attributes, " +
+            "if the original Google Cloud Publisher client added any attributes to the message while sending";
+
+    public static final String ORDERING_KEY_ATTRIBUTE = "gcp.pubsub.ordering.key";
+    public static final String ORDERING_KEY_DESCRIPTION = "If non-empty, identifies related messages for which publish order should be"
+            + " respected. If a 'Subscription' has 'enable_message_ordering' set to 'true',"
+            + " messages published with the same non-empty 'ordering_key' value will be"
+            + " delivered to subscribers in the order in which they are received by the"
+            + " Pub/Sub system. All 'PubsubMessage's published in a given 'PublishRequest'"
+            + " must specify the same 'ordering_key' value.";
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
new file mode 100644
index 0000000..be94fe5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/ConsumeGCPubSubLite.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.lite;
+
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsub.v1.AckReplyConsumer;
+import com.google.cloud.pubsub.v1.MessageReceiver;
+import com.google.cloud.pubsublite.SubscriptionPath;
+import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
+import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
+import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
+import com.google.common.collect.ImmutableList;
+import com.google.pubsub.v1.PubsubMessage;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.DYNAMIC_ATTRIBUTES_DESCRIPTION;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_ATTRIBUTES_COUNT_DESCRIPTION;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MSG_PUBLISH_TIME_DESCRIPTION;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.ORDERING_KEY_DESCRIPTION;
+
+@SeeAlso({PublishGCPubSubLite.class})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "consume", "lite"})
+@CapabilityDescription("Consumes message from the configured Google Cloud PubSub Lite subscription. In its current state, this processor "
+        + "will only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with "
+        + "'Use Application Default Credentials' or 'Use Compute Engine Credentials'.")
+@WritesAttributes({
+        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
+        @WritesAttribute(attribute = ORDERING_KEY_ATTRIBUTE, description = ORDERING_KEY_DESCRIPTION),
+        @WritesAttribute(attribute = MSG_ATTRIBUTES_COUNT_ATTRIBUTE, description = MSG_ATTRIBUTES_COUNT_DESCRIPTION),
+        @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
+        @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
+})
+public class ConsumeGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
+
+    public static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder()
+            .name("gcp-pubsub-subscription")
+            .displayName("Subscription")
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .description("Name of the Google Cloud Pub/Sub Subscription. Example: projects/8476107443/locations/europe-west1-d/subscriptions/my-lite-subscription")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor BYTES_OUTSTANDING = new PropertyDescriptor
+            .Builder().name("gcp-bytes-outstanding")
+            .displayName("Bytes Outstanding")
+            .description("The number of quota bytes that may be outstanding to the client.")
+            .required(true)
+            .defaultValue("10 MB")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor MESSAGES_OUTSTANDING = new PropertyDescriptor
+            .Builder().name("gcp-messages-outstanding")
+            .displayName("Messages Outstanding")
+            .description("The number of messages that may be outstanding to the client.")
+            .required(true)
+            .defaultValue("1000")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .build();
+
+    private Subscriber subscriber = null;
+    private static final BlockingQueue<Message> messages = new LinkedBlockingQueue<>();
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
+        final String subscription = validationContext.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue();
+
+        try {
+            SubscriptionPath.parse(subscription);
+        } catch (final ApiException e) {
+            results.add(new ValidationResult.Builder()
+                    .subject(SUBSCRIPTION.getName())
+                    .input(subscription)
+                    .valid(false)
+                    .explanation("The Suscription does not have a valid format.")
+                    .build());
+        }
+
+        return results;
+    }
+
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        try {
+            if (subscriber == null) {
+                subscriber = getSubscriber(context);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", e);
+            throw new ProcessException(e);
+        }
+        try {
+            subscriber.startAsync().awaitRunning();
+        } catch (final Exception e) {
+            getLogger().error("Failed to create Google Cloud PubSub Lite Subscriber", subscriber.failureCause());
+            throw new ProcessException(e);
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        try {
+            if (subscriber != null) {
+                subscriber.stopAsync().awaitTerminated();
+                subscriber = null;
+            }
+        } catch (final Exception e) {
+            getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Subscriber", e);
+        }
+    }
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.of(SUBSCRIPTION,
+                GCP_CREDENTIALS_PROVIDER_SERVICE,
+                BYTES_OUTSTANDING,
+                MESSAGES_OUTSTANDING);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.singleton(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        if (subscriber == null) {
+            getLogger().error("Google Cloud PubSub Lite Subscriber was not properly created. Yielding the processor...");
+            context.yield();
+            return;
+        }
+
+        if (!subscriber.isRunning()) {
+            getLogger().error("Google Cloud PubSub Lite Subscriber is not running. Yielding the processor...", subscriber.failureCause());
+            throw new ProcessException(subscriber.failureCause());
+        }
+
+        final Message message = messages.poll();
+        if (message == null) {
+            context.yield();
+            return;
+        }
+
+        FlowFile flowFile = session.create();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put(MESSAGE_ID_ATTRIBUTE, message.getMessage().getMessageId());
+        attributes.put(ORDERING_KEY_ATTRIBUTE, message.getMessage().getOrderingKey());
+        attributes.put(MSG_ATTRIBUTES_COUNT_ATTRIBUTE, String.valueOf(message.getMessage().getAttributesCount()));
+        attributes.put(MSG_PUBLISH_TIME_ATTRIBUTE, String.valueOf(message.getMessage().getPublishTime().getSeconds()));
+        attributes.putAll(message.getMessage().getAttributesMap());
+
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        flowFile = session.write(flowFile, out -> out.write(message.getMessage().getData().toStringUtf8().getBytes()));
+
+        session.transfer(flowFile, REL_SUCCESS);
+        session.getProvenanceReporter().receive(flowFile, context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
+
+        message.getConsumer().ack();
+    }
+
+    private Subscriber getSubscriber(final ProcessContext context) throws IOException {
+
+        final SubscriptionPath subscriptionPath = SubscriptionPath.parse(context.getProperty(SUBSCRIPTION).evaluateAttributeExpressions().getValue());
+
+        final FlowControlSettings flowControlSettings = FlowControlSettings.builder()
+                .setBytesOutstanding(context.getProperty(BYTES_OUTSTANDING).evaluateAttributeExpressions().asDataSize(DataUnit.B).longValue())
+                .setMessagesOutstanding(context.getProperty(MESSAGES_OUTSTANDING).evaluateAttributeExpressions().asLong())
+                .build();
+
+        final MessageReceiver receiver =
+                (PubsubMessage message, AckReplyConsumer consumer) -> {
+                    try {
+                        messages.put(new Message(message, consumer));
+                    } catch (final InterruptedException e) {
+                        getLogger().error("Could not save the message inside the internal queue of the processor", e);
+                    }
+                };
+
+        final SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder()
+                .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                .setSubscriptionPath(subscriptionPath)
+                .setReceiver(receiver)
+                .setPerPartitionFlowControlSettings(flowControlSettings)
+                .build();
+
+        return Subscriber.create(subscriberSettings);
+    }
+
+    private class Message {
+        private PubsubMessage message;
+        private AckReplyConsumer consumer;
+
+        public Message(final PubsubMessage message, final AckReplyConsumer consumer) {
+            this.message = message;
+            this.consumer = consumer;
+        }
+
+        public PubsubMessage getMessage() {
+            return message;
+        }
+
+        public AckReplyConsumer getConsumer() {
+            return consumer;
+        }
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
+        try {
+            getSubscriber(context);
+            verificationResults.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create the Subscriber")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully created the Google Cloud PubSub Lite Subscriber")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create Google Cloud PubSub Lite Subscriber", e);
+
+            verificationResults.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create the Subscriber")
+                    .outcome(Outcome.FAILED)
+                    .explanation("Failed to create Google Cloud PubSub Lite Subscriber: " + e.getLocalizedMessage())
+                    .build());
+        }
+        return verificationResults;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
new file mode 100644
index 0000000..7e97b3b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/lite/PublishGCPubSubLite.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub.lite;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
+import com.google.api.gax.batching.BatchingSettings;
+import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
+import com.google.cloud.pubsublite.TopicPath;
+import com.google.cloud.pubsublite.cloudpubsub.Publisher;
+import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Timestamp;
+import com.google.pubsub.v1.PubsubMessage;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.gcp.pubsub.AbstractGCPubSubProcessor;
+import org.threeten.bp.Duration;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.MESSAGE_ID_DESCRIPTION;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_ATTRIBUTE;
+import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_DESCRIPTION;
+
+@SeeAlso({ConsumeGCPubSubLite.class})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"google", "google-cloud", "gcp", "message", "pubsub", "publish", "lite"})
+@CapabilityDescription("Publishes the content of the incoming flowfile to the configured Google Cloud PubSub Lite topic. The processor supports dynamic properties." +
+        " If any dynamic properties are present, they will be sent along with the message in the form of 'attributes'. In its current state, this processor will " +
+        "only work if running on a Google Cloud Compute Engine instance and if using the GCP Credentials Controller Service with 'Use Application Default " +
+        "Credentials' or 'Use Compute Engine Credentials'.")
+@DynamicProperty(name = "Attribute name", value = "Value to be set to the attribute",
+        description = "Attributes to be set for the outgoing Google Cloud PubSub Lite message", expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+@WritesAttributes({
+        @WritesAttribute(attribute = MESSAGE_ID_ATTRIBUTE, description = MESSAGE_ID_DESCRIPTION),
+        @WritesAttribute(attribute = TOPIC_NAME_ATTRIBUTE, description = TOPIC_NAME_DESCRIPTION)
+})
+@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
+        + "will be read into memory to be sent as a PubSub message.")
+public class PublishGCPubSubLite extends AbstractGCPubSubProcessor implements VerifiableProcessor {
+
+    public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
+            .name("gcp-pubsub-topic")
+            .displayName("Topic Name")
+            .description("Name of the Google Cloud PubSub Topic. Example: projects/8476107443/locations/europe-west1-d/topics/my-lite-topic")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    public static final PropertyDescriptor ORDERING_KEY = new PropertyDescriptor
+            .Builder().name("gcp-ordering-key")
+            .displayName("Ordering Key")
+            .description("Messages with the same ordering key will always get published to the same partition. When this property is not "
+                    + "set, messages can get published to different partitions if more than one partition exists for the topic.")
+            .required(false)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor BATCH_BYTES = new PropertyDescriptor
+            .Builder().name("gcp-batch-bytes")
+            .displayName("Batch Bytes Threshold")
+            .description("Publish request gets triggered based on this Batch Bytes Threshold property and"
+                    + " the " + BATCH_SIZE.getDisplayName() + " property, whichever condition is met first.")
+            .required(true)
+            .defaultValue("3 MB")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .build();
+
+    private Publisher publisher = null;
+
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return ImmutableList.of(TOPIC_NAME,
+                GCP_CREDENTIALS_PROVIDER_SERVICE,
+                ORDERING_KEY,
+                BATCH_SIZE,
+                BATCH_BYTES);
+    }
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .required(false)
+                .name(propertyDescriptorName)
+                .displayName(propertyDescriptorName)
+                .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+                .dynamic(true)
+                .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+                .build();
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Collections.unmodifiableSet(
+                new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE))
+        );
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final Collection<ValidationResult> results = new ArrayList<ValidationResult>(1);
+        final String topic = validationContext.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+
+        try {
+            TopicPath.parse(topic);
+        } catch (final ApiException e) {
+            results.add(new ValidationResult.Builder()
+                    .subject(TOPIC_NAME.getName())
+                    .input(topic)
+                    .valid(false)
+                    .explanation("The Topic does not have a valid format.")
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
+    @OnScheduled
+    public void onScheduled(final ProcessContext context) {
+        try {
+            if (publisher == null) {
+                publisher = getPublisher(context);
+            }
+        } catch (final Exception e) {
+            getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", e);
+            throw new ProcessException(e);
+        }
+        try {
+            publisher.startAsync().awaitRunning();
+        } catch (final Exception e) {
+            getLogger().error("Failed to create Google Cloud PubSub Lite Publisher", publisher.failureCause());
+            throw new ProcessException(e);
+        }
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        final int flowFileCount = context.getProperty(BATCH_SIZE).asInteger();
+        final List<FlowFile> flowFiles = session.get(flowFileCount);
+
+        if (flowFiles.isEmpty()) {
+            context.yield();
+            return;
+        }
+
+        if (publisher == null) {
+            getLogger().error("Google Cloud PubSub Lite Publisher was not properly created. Yielding the processor...");
+            context.yield();
+            return;
+        }
+
+        if(!publisher.isRunning()) {
+            getLogger().error("Google Cloud PubSub Lite Publisher is not running. Yielding the processor...", publisher.failureCause());
+            throw new ProcessException(publisher.failureCause());
+        }
+
+        final long startNanos = System.nanoTime();
+        final List<FlowFile> successfulFlowFiles = new ArrayList<>();
+        final String topicName = context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue();
+        final List<ApiFuture<String>> futures = new ArrayList<>();
+
+        try {
+            for (FlowFile flowFile : flowFiles) {
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                session.exportTo(flowFile, baos);
+                final ByteString flowFileContent = ByteString.copyFrom(baos.toByteArray());
+                final String orderingKey = context.getProperty(ORDERING_KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+                final PubsubMessage.Builder message = PubsubMessage.newBuilder().setData(flowFileContent)
+                        .setPublishTime(Timestamp.newBuilder().build())
+                        .putAllAttributes(getDynamicAttributesMap(context, flowFile));
+
+                if (orderingKey != null) {
+                    message.setOrderingKey(orderingKey);
+                }
+
+                final ApiFuture<String> messageIdFuture = publisher.publish(message.build());
+                futures.add(messageIdFuture);
+
+                flowFile = session.putAttribute(flowFile, TOPIC_NAME_ATTRIBUTE, topicName);
+            }
+
+            try {
+                ApiFutures.allAsList(futures).get();
+                successfulFlowFiles.addAll(flowFiles);
+            } catch (InterruptedException | ExecutionException e) {
+                getLogger().error("Failed to publish the messages to Google Cloud PubSub Lite topic '{}' due to {}, "
+                        + "routing all messages from the batch to failure", new Object[]{topicName, e.getLocalizedMessage()}, e);
+                session.transfer(flowFiles, REL_FAILURE);
+                context.yield();
+            }
+        } finally {
+            if (!successfulFlowFiles.isEmpty()) {
+                session.transfer(successfulFlowFiles, REL_SUCCESS);
+                for (FlowFile flowFile : successfulFlowFiles) {
+                    final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
+                    session.getProvenanceReporter().send(flowFile, topicName, transmissionMillis);
+                }
+            }
+        }
+    }
+
+    @OnStopped
+    public void onStopped() {
+        try {
+            if (publisher != null) {
+                publisher.stopAsync().awaitTerminated();
+                publisher = null;
+            }
+        } catch (final Exception e) {
+            getLogger().warn("Failed to gracefully shutdown the Google Cloud PubSub Lite Publisher", e);
+        }
+    }
+
+    private Map<String, String> getDynamicAttributesMap(final ProcessContext context, final FlowFile flowFile) {
+        final Map<String, String> attributes = new HashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            if (entry.getKey().isDynamic()) {
+                final String value = context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue();
+                attributes.put(entry.getKey().getName(), value);
+            }
+        }
+
+        return attributes;
+    }
+
+    private Publisher getPublisher(final ProcessContext context) {
+        final TopicPath topicPath = TopicPath.parse(context.getProperty(TOPIC_NAME).evaluateAttributeExpressions().getValue());
+
+        final PublisherSettings publisherSettings =
+                PublisherSettings.newBuilder()
+                    .setTopicPath(topicPath)
+                    .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                    .setBatchingSettings(BatchingSettings.newBuilder()
+                            .setElementCountThreshold(context.getProperty(BATCH_SIZE).asLong())
+                            .setDelayThreshold(Duration.ofMillis(100))
+                            .setRequestByteThreshold(context.getProperty(BATCH_BYTES).asDataSize(DataUnit.B).longValue())
+                            .setIsEnabled(true)
+                            .build())
+                    .build();
+
+        return Publisher.create(publisherSettings);
+    }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> verificationResults = new ArrayList<>();
+        try {
+            getPublisher(context);
+            verificationResults.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create the Publisher")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation("Successfully created the Google Cloud PubSub Lite Publisher")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create Google Cloud PubSub Lite Publisher", e);
+
+            verificationResults.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("Create the Publisher")
+                    .outcome(Outcome.FAILED)
+                    .explanation("Failed to create Google Cloud PubSub Lite Publisher: " + e.getLocalizedMessage())
+                    .build());
+        }
+        return verificationResults;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 9d26958..dc4c440 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,5 +18,7 @@ org.apache.nifi.processors.gcp.storage.DeleteGCSObject
 org.apache.nifi.processors.gcp.storage.ListGCSBucket
 org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub
 org.apache.nifi.processors.gcp.pubsub.ConsumeGCPubSub
+org.apache.nifi.processors.gcp.pubsub.lite.PublishGCPubSubLite
+org.apache.nifi.processors.gcp.pubsub.lite.ConsumeGCPubSubLite
 org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch
 org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
index 66352f7..1f99faf 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml
@@ -33,6 +33,7 @@
         <dependency>
             <groupId>com.google.auth</groupId>
             <artifactId>google-auth-library-oauth2-http</artifactId>
+            <version>1.2.1</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.google.code.findbugs</groupId>
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
index b6289b9..e7cd1c1 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-gcp-bundle/pom.xml
@@ -27,7 +27,7 @@
     <packaging>pom</packaging>
 
     <properties>
-        <google.cloud.sdk.version>0.125.0</google.cloud.sdk.version>
+        <google.cloud.sdk.version>0.162.0</google.cloud.sdk.version>
     </properties>
 
     <dependencyManagement>