You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/05/02 16:41:45 UTC

[nifi] branch main updated: NIFI-9798 Added Proxy Support for ConsumeGCPubSub and PublishGCPubSub

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new b04632129f NIFI-9798 Added Proxy Support for ConsumeGCPubSub and PublishGCPubSub
b04632129f is described below

commit b04632129faa0dbaef3705bd2ea7a1d5fc3dac0e
Author: Juldrixx <ju...@gmail.com>
AuthorDate: Mon Mar 14 18:24:33 2022 -0500

    NIFI-9798 Added Proxy Support for ConsumeGCPubSub and PublishGCPubSub
    
    This closes #5868
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../pubsub/AbstractGCPubSubWithProxyProcessor.java | 85 ++++++++++++++++++++++
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     | 14 ++--
 .../processors/gcp/pubsub/PublishGCPubSub.java     | 16 ++--
 3 files changed, 101 insertions(+), 14 deletions(-)

diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
new file mode 100644
index 0000000000..c4d3e68b68
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import io.grpc.HttpConnectProxiedSocketAddress;
+import io.grpc.ProxiedSocketAddress;
+import io.grpc.ProxyDetector;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractGCPubSubWithProxyProcessor extends AbstractGCPubSubProcessor {
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Collections.unmodifiableList(Arrays.asList(
+                PROJECT_ID,
+                ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS),
+                GCP_CREDENTIALS_PROVIDER_SERVICE)
+        );
+    }
+    protected TransportChannelProvider getTransportChannelProvider(ProcessContext context) {
+        final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
+            final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+            final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
+            if (proxyHost != null && proxyPort != null && proxyPort > 0) {
+                final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+                final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+                final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+                componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+                componentProxyConfig.setProxyServerHost(proxyHost);
+                componentProxyConfig.setProxyServerPort(proxyPort);
+                componentProxyConfig.setProxyUserName(proxyUser);
+                componentProxyConfig.setProxyUserPassword(proxyPassword);
+                return componentProxyConfig;
+            }
+            return ProxyConfiguration.DIRECT_CONFIGURATION;
+        });
+
+        return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
+                .setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector(
+                        new ProxyDetector() {
+                            @Nullable
+                            @Override
+                            public ProxiedSocketAddress proxyFor(SocketAddress socketAddress) {
+                                if (Proxy.Type.HTTP.equals(proxyConfiguration.getProxyType())) {
+                                    return HttpConnectProxiedSocketAddress.newBuilder()
+                                            .setUsername(proxyConfiguration.getProxyUserName())
+                                            .setPassword(proxyConfiguration.getProxyUserPassword())
+                                            .setProxyAddress(new InetSocketAddress(proxyConfiguration.getProxyServerHost(),
+                                                    proxyConfiguration.getProxyServerPort()))
+                                            .setTargetAddress((InetSocketAddress) socketAddress)
+                                            .build();
+                                } else {
+                                    return null;
+                                }
+                            }
+                        }))
+                .build();
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index c2cce82a9e..3928a2ae63 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -50,7 +50,6 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -83,7 +82,7 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_
         @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
         @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
 })
-public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
 
     private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
 
@@ -99,7 +98,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
     private SubscriberStub subscriber = null;
     private PullRequest pullRequest;
 
-    private AtomicReference<Exception> storedException = new AtomicReference<>();
+    private final AtomicReference<Exception> storedException = new AtomicReference<>();
 
     @OnScheduled
     public void onScheduled(ProcessContext context) {
@@ -191,10 +190,10 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Collections.unmodifiableList(Arrays.asList(PROJECT_ID,
-                GCP_CREDENTIALS_PROVIDER_SERVICE,
-                SUBSCRIPTION,
-                BATCH_SIZE));
+        final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        descriptors.add(SUBSCRIPTION);
+        descriptors.add(BATCH_SIZE);
+        return Collections.unmodifiableList(descriptors);
     }
 
     @Override
@@ -271,6 +270,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
     private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {
         final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
                 .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                .setTransportChannelProvider(getTransportChannelProvider(context))
                 .build();
 
         return GrpcSubscriberStub.create(subscriberStubSettings);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index c0646a7028..86545ab5fd 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -86,7 +86,7 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
 })
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
         + "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSub extends AbstractGCPubSubProcessor{
+public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
 
     public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
@@ -104,14 +104,14 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
             .build();
 
     private Publisher publisher = null;
-    private AtomicReference<Exception> storedException = new AtomicReference<>();
+    private final AtomicReference<Exception> storedException = new AtomicReference<>();
 
     @Override
     public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
-        return Collections.unmodifiableList(Arrays.asList(PROJECT_ID,
-                GCP_CREDENTIALS_PROVIDER_SERVICE,
-                TOPIC_NAME,
-                BATCH_SIZE));
+        final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        descriptors.add(TOPIC_NAME);
+        descriptors.add(BATCH_SIZE);
+        return Collections.unmodifiableList(descriptors);
     }
 
     @Override
@@ -168,6 +168,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
             try {
                 final PublisherStubSettings publisherStubSettings = PublisherStubSettings.newBuilder()
                         .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                        .setTransportChannelProvider(getTransportChannelProvider(context))
                         .build();
 
                 final GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings);
@@ -253,7 +254,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
                                         "so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
                         session.transfer(flowFile, REL_RETRY);
                     } else {
-                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", new Object[]{topicName, e});
+                        getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}'", topicName, e);
                         session.transfer(flowFile, REL_FAILURE);
                     }
                     context.yield();
@@ -313,6 +314,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
 
         return Publisher.newBuilder(getTopicName(context))
                 .setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+                .setChannelProvider(getTransportChannelProvider(context))
                 .setBatchingSettings(BatchingSettings.newBuilder()
                 .setElementCountThreshold(batchSize)
                 .setIsEnabled(true)