You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/05/02 16:41:45 UTC
[nifi] branch main updated: NIFI-9798 Added Proxy Support for ConsumeGCPubSub and PublishGCPubSub
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new b04632129f NIFI-9798 Added Proxy Support for ConsumeGCPubSub and PublishGCPubSub
b04632129f is described below
commit b04632129faa0dbaef3705bd2ea7a1d5fc3dac0e
Author: Juldrixx <ju...@gmail.com>
AuthorDate: Mon Mar 14 18:24:33 2022 -0500
NIFI-9798 Added Proxy Support for ConsumeGCPubSub and PublishGCPubSub
This closes #5868
Signed-off-by: David Handermann <ex...@apache.org>
---
.../pubsub/AbstractGCPubSubWithProxyProcessor.java | 85 ++++++++++++++++++++++
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 14 ++--
.../processors/gcp/pubsub/PublishGCPubSub.java | 16 ++--
3 files changed, 101 insertions(+), 14 deletions(-)
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
new file mode 100644
index 0000000000..c4d3e68b68
--- /dev/null
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.pubsub;
+
+import com.google.api.gax.rpc.TransportChannelProvider;
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
+import io.grpc.HttpConnectProxiedSocketAddress;
+import io.grpc.ProxiedSocketAddress;
+import io.grpc.ProxyDetector;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.gcp.ProxyAwareTransportFactory;
+import org.apache.nifi.proxy.ProxyConfiguration;
+
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.SocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public abstract class AbstractGCPubSubWithProxyProcessor extends AbstractGCPubSubProcessor {
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Collections.unmodifiableList(Arrays.asList(
+ PROJECT_ID,
+ ProxyConfiguration.createProxyConfigPropertyDescriptor(true, ProxyAwareTransportFactory.PROXY_SPECS),
+ GCP_CREDENTIALS_PROVIDER_SERVICE)
+ );
+ }
+ protected TransportChannelProvider getTransportChannelProvider(ProcessContext context) {
+ final ProxyConfiguration proxyConfiguration = ProxyConfiguration.getConfiguration(context, () -> {
+ final String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
+ final Integer proxyPort = context.getProperty(PROXY_PORT).evaluateAttributeExpressions().asInteger();
+ if (proxyHost != null && proxyPort != null && proxyPort > 0) {
+ final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
+ final String proxyUser = context.getProperty(HTTP_PROXY_USERNAME).evaluateAttributeExpressions().getValue();
+ final String proxyPassword = context.getProperty(HTTP_PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
+ componentProxyConfig.setProxyType(Proxy.Type.HTTP);
+ componentProxyConfig.setProxyServerHost(proxyHost);
+ componentProxyConfig.setProxyServerPort(proxyPort);
+ componentProxyConfig.setProxyUserName(proxyUser);
+ componentProxyConfig.setProxyUserPassword(proxyPassword);
+ return componentProxyConfig;
+ }
+ return ProxyConfiguration.DIRECT_CONFIGURATION;
+ });
+
+ return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
+ .setChannelConfigurator(managedChannelBuilder -> managedChannelBuilder.proxyDetector(
+ new ProxyDetector() {
+ @Nullable
+ @Override
+ public ProxiedSocketAddress proxyFor(SocketAddress socketAddress) {
+ if (Proxy.Type.HTTP.equals(proxyConfiguration.getProxyType())) {
+ return HttpConnectProxiedSocketAddress.newBuilder()
+ .setUsername(proxyConfiguration.getProxyUserName())
+ .setPassword(proxyConfiguration.getProxyUserPassword())
+ .setProxyAddress(new InetSocketAddress(proxyConfiguration.getProxyServerHost(),
+ proxyConfiguration.getProxyServerPort()))
+ .setTargetAddress((InetSocketAddress) socketAddress)
+ .build();
+ } else {
+ return null;
+ }
+ }
+ }))
+ .build();
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index c2cce82a9e..3928a2ae63 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -50,7 +50,6 @@ import org.apache.nifi.processor.util.StandardValidators;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -83,7 +82,7 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_
@WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = MSG_PUBLISH_TIME_DESCRIPTION),
@WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description = DYNAMIC_ATTRIBUTES_DESCRIPTION)
})
-public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
+public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.subscriptions.consume");
@@ -99,7 +98,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
private SubscriberStub subscriber = null;
private PullRequest pullRequest;
- private AtomicReference<Exception> storedException = new AtomicReference<>();
+ private final AtomicReference<Exception> storedException = new AtomicReference<>();
@OnScheduled
public void onScheduled(ProcessContext context) {
@@ -191,10 +190,10 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Collections.unmodifiableList(Arrays.asList(PROJECT_ID,
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- SUBSCRIPTION,
- BATCH_SIZE));
+ final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ descriptors.add(SUBSCRIPTION);
+ descriptors.add(BATCH_SIZE);
+ return Collections.unmodifiableList(descriptors);
}
@Override
@@ -271,6 +270,7 @@ public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
private SubscriberStub getSubscriber(final ProcessContext context) throws IOException {
final SubscriberStubSettings subscriberStubSettings = SubscriberStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+ .setTransportChannelProvider(getTransportChannelProvider(context))
.build();
return GrpcSubscriberStub.create(subscriberStubSettings);
diff --git a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index c0646a7028..86545ab5fd 100644
--- a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -86,7 +86,7 @@ import static org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description = "The entirety of the FlowFile's content "
+ "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSub extends AbstractGCPubSubProcessor{
+public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
private static final List<String> REQUIRED_PERMISSIONS = Collections.singletonList("pubsub.topics.publish");
public static final PropertyDescriptor TOPIC_NAME = new PropertyDescriptor.Builder()
@@ -104,14 +104,14 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
.build();
private Publisher publisher = null;
- private AtomicReference<Exception> storedException = new AtomicReference<>();
+ private final AtomicReference<Exception> storedException = new AtomicReference<>();
@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return Collections.unmodifiableList(Arrays.asList(PROJECT_ID,
- GCP_CREDENTIALS_PROVIDER_SERVICE,
- TOPIC_NAME,
- BATCH_SIZE));
+ final List<PropertyDescriptor> descriptors = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ descriptors.add(TOPIC_NAME);
+ descriptors.add(BATCH_SIZE);
+ return Collections.unmodifiableList(descriptors);
}
@Override
@@ -168,6 +168,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
try {
final PublisherStubSettings publisherStubSettings = PublisherStubSettings.newBuilder()
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+ .setTransportChannelProvider(getTransportChannelProvider(context))
.build();
final GrpcPublisherStub publisherStub = GrpcPublisherStub.create(publisherStubSettings);
@@ -253,7 +254,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
"so routing to retry", new Object[]{topicName, e.getLocalizedMessage()}, e);
session.transfer(flowFile, REL_RETRY);
} else {
- getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}' due to {}", new Object[]{topicName, e});
+ getLogger().error("Failed to publish the message to Google Cloud PubSub topic '{}'", topicName, e);
session.transfer(flowFile, REL_FAILURE);
}
context.yield();
@@ -313,6 +314,7 @@ public class PublishGCPubSub extends AbstractGCPubSubProcessor{
return Publisher.newBuilder(getTopicName(context))
.setCredentialsProvider(FixedCredentialsProvider.create(getGoogleCredentials(context)))
+ .setChannelProvider(getTransportChannelProvider(context))
.setBatchingSettings(BatchingSettings.newBuilder()
.setElementCountThreshold(batchSize)
.setIsEnabled(true)