You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/03 21:09:47 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1067] Add SFTP DataNode type in Gobblin-as-a-Service (GaaS) FlowGraph[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e3c9da  [GOBBLIN-1067] Add SFTP DataNode type in Gobblin-as-a-Service (GaaS) FlowGraph[]
8e3c9da is described below

commit 8e3c9da597fc29a3301831607ebba20ed421ef67
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Mar 3 13:09:41 2020 -0800

    [GOBBLIN-1067] Add SFTP DataNode type in Gobblin-as-a-Service (GaaS) FlowGraph[]
    
    Closes #2906 from sv2000/sftpNode
---
 .../flowgraph/datanodes/fs/SftpDataNode.java       | 80 ++++++++++++++++++++++
 .../flowgraph/datanodes/fs/SftpDataNodeTest.java   | 64 +++++++++++++++++
 2 files changed, 144 insertions(+)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNode.java
new file mode 100644
index 0000000..c093423
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+@EqualsAndHashCode(callSuper = true)
+public class SftpDataNode extends FileSystemDataNode {
+  public static final String SFTP_SCHEME = "sftp";
+  public static final String DEFAULT_SFTP_URI = "sftp:///";
+  public static final String PLATFORM = "sftpfs";
+  public static final String SFTP_HOSTNAME = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sftp.hostname";
+  public static final String SFTP_PORT = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sftp.port";
+
+  @Getter
+  private String hostName;
+  @Getter
+  private Integer port;
+  @Getter
+  private Config rawConfig;
+
+  private static final Config DEFAULT_FALLBACK =
+      ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+          .put(SFTP_PORT, ConfigurationKeys.SOURCE_CONN_DEFAULT_PORT)
+          .build());
+
+  /**
+   * Constructor. A SFTP DataNode must have {@link SftpDataNode#SFTP_HOSTNAME} configured.
+   */
+  public SftpDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps.withFallback(ConfigFactory.empty().withValue(FileSystemDataNode.FS_URI_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_SFTP_URI))));
+    try {
+      this.rawConfig = nodeProps.withFallback(DEFAULT_FALLBACK).withFallback(super.getRawConfig());
+      this.hostName = ConfigUtils.getString(this.rawConfig, SFTP_HOSTNAME, "");
+      Preconditions.checkArgument(!Strings.isNullOrEmpty(hostName), SFTP_HOSTNAME + " cannot be null or empty.");
+      this.port = ConfigUtils.getInt(this.rawConfig, SFTP_PORT, -1);
+      Preconditions.checkArgument(this.port > 0,  "Invalid value for " + SFTP_PORT + ": " + this.port);
+    } catch (Exception e) {
+      throw new DataNodeCreationException(e);
+    }
+  }
+
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    return fsUri.getScheme().equals(SFTP_SCHEME);
+  }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return PLATFORM;
+  }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNodeTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNodeTest.java
new file mode 100644
index 0000000..80d9acf
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNodeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class SftpDataNodeTest {
+
+  @Test
+  public void testCreate() throws DataNode.DataNodeCreationException {
+    //Create a SFTP DataNode with default SFTP port
+    Config config = ConfigFactory.empty().withValue(SftpDataNode.SFTP_HOSTNAME,
+        ConfigValueFactory.fromAnyRef("testHost"))
+        .withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("testId"));
+    SftpDataNode dataNode = new SftpDataNode(config);
+    Assert.assertEquals(dataNode.getId(), "testId");
+    Assert.assertEquals(dataNode.getHostName(), "testHost");
+    Assert.assertEquals(dataNode.getPort().intValue(), ConfigurationKeys.SOURCE_CONN_DEFAULT_PORT);
+    Assert.assertEquals(dataNode.getDefaultDatasetDescriptorPlatform(), SftpDataNode.PLATFORM);
+    Assert.assertEquals(dataNode.getDefaultDatasetDescriptorClass(), FSDatasetDescriptor.class.getCanonicalName());
+
+    config = config.withValue(SftpDataNode.SFTP_PORT, ConfigValueFactory.fromAnyRef(143));
+    SftpDataNode dataNodeWithPort = new SftpDataNode(config);
+    Assert.assertEquals(dataNode.getId(), "testId");
+    Assert.assertEquals(dataNode.getHostName(), "testHost");
+    Assert.assertEquals(dataNodeWithPort.getPort().intValue(), 143);
+    Assert.assertEquals(dataNode.getDefaultDatasetDescriptorPlatform(), SftpDataNode.PLATFORM);
+    Assert.assertEquals(dataNode.getDefaultDatasetDescriptorClass(), FSDatasetDescriptor.class.getCanonicalName());
+
+    Config configMissingProps = ConfigFactory.empty().withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+        ConfigValueFactory.fromAnyRef("testId"));
+    try {
+      DataNode sftpNode = new SftpDataNode(configMissingProps);
+      Assert.fail("Unexpected success in creating Sftp node.");
+    } catch (DataNode.DataNodeCreationException e) {
+      //Expected exception.
+    }
+  }
+}
\ No newline at end of file