You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/22 17:18:58 UTC

[02/28] incubator-nifi git commit: NIFI-332 Proxy support in GetFTP Processor

NIFI-332 Proxy support in GetFTP Processor

Change-Id: I72fee6c5f2ef576a6c7d736199aab510bee744a7


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8a1a4b80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8a1a4b80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8a1a4b80

Branch: refs/heads/nifi-site-to-site-client
Commit: 8a1a4b807e682fc6e69bcc4656682b187f9847fa
Parents: b64fe47
Author: Adam Sotona <ad...@merck.com>
Authored: Tue Feb 10 10:07:03 2015 +0100
Committer: Adam Sotona <ad...@merck.com>
Committed: Tue Feb 10 10:07:03 2015 +0100

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/GetFTP.java |  5 ++
 .../processors/standard/util/FTPTransfer.java   | 48 +++++++++++++-
 .../standard/util/SocksProxySocketFactory.java  | 69 ++++++++++++++++++++
 3 files changed, 121 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8a1a4b80/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
index 2dabbc6..18bdc93 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GetFTP.java
@@ -57,6 +57,11 @@ public class GetFTP extends GetFileTransfer {
         properties.add(FTPTransfer.MAX_SELECTS);
         properties.add(FTPTransfer.REMOTE_POLL_BATCH_SIZE);
         properties.add(FTPTransfer.USE_NATURAL_ORDERING);
+        properties.add(FTPTransfer.PROXY_TYPE);
+        properties.add(FTPTransfer.PROXY_HOST);
+        properties.add(FTPTransfer.PROXY_PORT);
+        properties.add(FTPTransfer.HTTP_PROXY_USERNAME);
+        properties.add(FTPTransfer.HTTP_PROXY_PASSWORD);
         this.properties = Collections.unmodifiableList(properties);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8a1a4b80/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
index 04a9a0f..c3d7bbf 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/FTPTransfer.java
@@ -20,6 +20,8 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
 import java.net.UnknownHostException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -40,6 +42,7 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPHTTPClient;
 import org.apache.commons.net.ftp.FTPReply;
 
 public class FTPTransfer implements FileTransfer {
@@ -49,6 +52,9 @@ public class FTPTransfer implements FileTransfer {
     public static final String TRANSFER_MODE_ASCII = "ASCII";
     public static final String TRANSFER_MODE_BINARY = "Binary";
     public static final String FTP_TIMEVAL_FORMAT = "yyyyMMddHHmmss";
+    public static final String PROXY_TYPE_DIRECT = Proxy.Type.DIRECT.name();
+    public static final String PROXY_TYPE_HTTP = Proxy.Type.HTTP.name();
+    public static final String PROXY_TYPE_SOCKS = Proxy.Type.SOCKS.name();
 
     public static final PropertyDescriptor CONNECTION_MODE = new PropertyDescriptor.Builder()
             .name("Connection Mode")
@@ -69,6 +75,35 @@ public class FTPTransfer implements FileTransfer {
             .required(true)
             .defaultValue("21")
             .build();
+    public static final PropertyDescriptor PROXY_TYPE = new PropertyDescriptor.Builder()
+            .name("Proxy Type")
+            .description("Proxy type used for file transfers")
+            .allowableValues(PROXY_TYPE_DIRECT, PROXY_TYPE_HTTP, PROXY_TYPE_SOCKS)
+            .defaultValue(PROXY_TYPE_DIRECT)
+            .build();
+    public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
+            .name("Proxy Host")
+            .description("The fully qualified hostname or IP address of the proxy server")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor PROXY_PORT = new PropertyDescriptor.Builder()
+            .name("Proxy Port")
+            .description("The port of the proxy server")
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor HTTP_PROXY_USERNAME = new PropertyDescriptor.Builder()
+            .name("Http Proxy Username")
+            .description("Http Proxy Username")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .build();
+    public static final PropertyDescriptor HTTP_PROXY_PASSWORD = new PropertyDescriptor.Builder()
+            .name("Http Proxy Password")
+            .description("Http Proxy Password")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(false)
+            .sensitive(true)
+            .build();
 
     private final ProcessorLog logger;
 
@@ -450,7 +485,18 @@ public class FTPTransfer implements FileTransfer {
             }
         }
 
-        final FTPClient client = new FTPClient();
+        final Proxy.Type proxyType = Proxy.Type.valueOf(ctx.getProperty(PROXY_TYPE).getValue());
+        final String proxyHost = ctx.getProperty(PROXY_HOST).getValue();
+        final Integer proxyPort = ctx.getProperty(PROXY_PORT).asInteger();
+        FTPClient client;
+        if (proxyType == Proxy.Type.HTTP) {
+            client = new FTPHTTPClient(proxyHost, proxyPort, ctx.getProperty(HTTP_PROXY_USERNAME).getValue(), ctx.getProperty(HTTP_PROXY_PASSWORD).getValue());
+        } else {
+            client = new FTPClient();
+            if (proxyType == Proxy.Type.SOCKS) {
+                client.setSocketFactory(new SocksProxySocketFactory(new Proxy(proxyType, new InetSocketAddress(proxyHost, proxyPort))));
+            }
+        }
         this.client = client;
         client.setDataTimeout(ctx.getProperty(DATA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());
         client.setDefaultTimeout(ctx.getProperty(CONNECTION_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8a1a4b80/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java
new file mode 100644
index 0000000..55ae468
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/util/SocksProxySocketFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.util;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import javax.net.SocketFactory;
+
+public final class SocksProxySocketFactory extends SocketFactory {
+
+    private final Proxy proxy;
+
+    public SocksProxySocketFactory(Proxy proxy) {
+        this.proxy = proxy;
+    }
+
+    @Override
+    public Socket createSocket() throws IOException {
+        return new Socket(proxy);
+    }
+
+    @Override
+    public Socket createSocket(InetAddress addr, int port) throws IOException {
+        Socket socket = createSocket();
+        socket.connect(new InetSocketAddress(addr, port));
+        return socket;
+    }
+
+    @Override
+    public Socket createSocket(InetAddress addr, int port, InetAddress localHostAddr, int localPort) throws IOException {
+        Socket socket = createSocket();
+        socket.bind(new InetSocketAddress(localHostAddr, localPort));
+        socket.connect(new InetSocketAddress(addr, port));
+        return socket;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+        Socket socket = createSocket();
+        socket.connect(new InetSocketAddress(host, port));
+        return socket;
+    }
+
+    @Override
+    public Socket createSocket(String host, int port, InetAddress localHostAddr, int localPort) throws IOException, UnknownHostException {
+        Socket socket = createSocket();
+        socket.bind(new InetSocketAddress(localHostAddr, localPort));
+        socket.connect(new InetSocketAddress(host, port));
+        return socket;
+    }
+}