You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/03 21:09:47 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1067] Add SFTP
DataNode type in Gobblin-as-a-Service (GaaS) FlowGraph[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8e3c9da [GOBBLIN-1067] Add SFTP DataNode type in Gobblin-as-a-Service (GaaS) FlowGraph[]
8e3c9da is described below
commit 8e3c9da597fc29a3301831607ebba20ed421ef67
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Mar 3 13:09:41 2020 -0800
[GOBBLIN-1067] Add SFTP DataNode type in Gobblin-as-a-Service (GaaS) FlowGraph[]
Closes #2906 from sv2000/sftpNode
---
.../flowgraph/datanodes/fs/SftpDataNode.java | 80 ++++++++++++++++++++++
.../flowgraph/datanodes/fs/SftpDataNodeTest.java | 64 +++++++++++++++++
2 files changed, 144 insertions(+)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNode.java
new file mode 100644
index 0000000..c093423
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNode.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+import org.apache.gobblin.util.ConfigUtils;
+
+@EqualsAndHashCode(callSuper = true)
+public class SftpDataNode extends FileSystemDataNode {
+ public static final String SFTP_SCHEME = "sftp";
+ public static final String DEFAULT_SFTP_URI = "sftp:///";
+ public static final String PLATFORM = "sftpfs";
+ public static final String SFTP_HOSTNAME = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sftp.hostname";
+ public static final String SFTP_PORT = FlowGraphConfigurationKeys.DATA_NODE_PREFIX + "sftp.port";
+
+ @Getter
+ private String hostName;
+ @Getter
+ private Integer port;
+ @Getter
+ private Config rawConfig;
+
+ private static final Config DEFAULT_FALLBACK =
+ ConfigFactory.parseMap(ImmutableMap.<String, Object>builder()
+ .put(SFTP_PORT, ConfigurationKeys.SOURCE_CONN_DEFAULT_PORT)
+ .build());
+
+ /**
+ * Constructor. A SFTP DataNode must have {@link SftpDataNode#SFTP_HOSTNAME} configured.
+ */
+ public SftpDataNode(Config nodeProps) throws DataNodeCreationException {
+ super(nodeProps.withFallback(ConfigFactory.empty().withValue(FileSystemDataNode.FS_URI_KEY, ConfigValueFactory.fromAnyRef(DEFAULT_SFTP_URI))));
+ try {
+ this.rawConfig = nodeProps.withFallback(DEFAULT_FALLBACK).withFallback(super.getRawConfig());
+ this.hostName = ConfigUtils.getString(this.rawConfig, SFTP_HOSTNAME, "");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(hostName), SFTP_HOSTNAME + " cannot be null or empty.");
+ this.port = ConfigUtils.getInt(this.rawConfig, SFTP_PORT, -1);
+ Preconditions.checkArgument(this.port > 0, "Invalid value for " + SFTP_PORT + ": " + this.port);
+ } catch (Exception e) {
+ throw new DataNodeCreationException(e);
+ }
+ }
+
+ @Override
+ public boolean isUriValid(URI fsUri) {
+ return fsUri.getScheme().equals(SFTP_SCHEME);
+ }
+
+ @Override
+ public String getDefaultDatasetDescriptorPlatform() {
+ return PLATFORM;
+ }
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNodeTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNodeTest.java
new file mode 100644
index 0000000..80d9acf
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/SftpDataNodeTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor;
+import org.apache.gobblin.service.modules.flowgraph.DataNode;
+import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
+
+
+public class SftpDataNodeTest {
+
+ @Test
+ public void testCreate() throws DataNode.DataNodeCreationException {
+ //Create a SFTP DataNode with default SFTP port
+ Config config = ConfigFactory.empty().withValue(SftpDataNode.SFTP_HOSTNAME,
+ ConfigValueFactory.fromAnyRef("testHost"))
+ .withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef("testId"));
+ SftpDataNode dataNode = new SftpDataNode(config);
+ Assert.assertEquals(dataNode.getId(), "testId");
+ Assert.assertEquals(dataNode.getHostName(), "testHost");
+ Assert.assertEquals(dataNode.getPort().intValue(), ConfigurationKeys.SOURCE_CONN_DEFAULT_PORT);
+ Assert.assertEquals(dataNode.getDefaultDatasetDescriptorPlatform(), SftpDataNode.PLATFORM);
+ Assert.assertEquals(dataNode.getDefaultDatasetDescriptorClass(), FSDatasetDescriptor.class.getCanonicalName());
+
+ config = config.withValue(SftpDataNode.SFTP_PORT, ConfigValueFactory.fromAnyRef(143));
+ SftpDataNode dataNodeWithPort = new SftpDataNode(config);
+ Assert.assertEquals(dataNode.getId(), "testId");
+ Assert.assertEquals(dataNode.getHostName(), "testHost");
+ Assert.assertEquals(dataNodeWithPort.getPort().intValue(), 143);
+ Assert.assertEquals(dataNode.getDefaultDatasetDescriptorPlatform(), SftpDataNode.PLATFORM);
+ Assert.assertEquals(dataNode.getDefaultDatasetDescriptorClass(), FSDatasetDescriptor.class.getCanonicalName());
+
+ Config configMissingProps = ConfigFactory.empty().withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY,
+ ConfigValueFactory.fromAnyRef("testId"));
+ try {
+ DataNode sftpNode = new SftpDataNode(configMissingProps);
+ Assert.fail("Unexpected success in creating Sftp node.");
+ } catch (DataNode.DataNodeCreationException e) {
+ //Expected exception.
+ }
+ }
+}
\ No newline at end of file