You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2021/07/01 19:22:27 UTC

[gobblin] branch master updated: [GOBBLIN-1487] Add gridfs datanode gaas (#3326)

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbd722  [GOBBLIN-1487] Add gridfs datanode gaas (#3326)
7bbd722 is described below

commit 7bbd722abeda22f274b2bc5e47f3079cb168cd6f
Author: William Lo <lo...@gmail.com>
AuthorDate: Thu Jul 1 12:21:35 2021 -0700

    [GOBBLIN-1487] Add gridfs datanode gaas (#3326)
    
    * Adds gridfs node as a datanode type
    
    * Adds gridfs node for GaaS
---
 .../flowgraph/datanodes/fs/GridFSDataNode.java     | 58 ++++++++++++++++++++++
 .../modules/flow/MultiHopFlowCompilerTest.java     | 19 +++++++
 gobblin-service/src/test/resources/flow/flow6.conf | 18 +++++++
 .../datanodes/GridFsDataNode-1.properties          |  3 ++
 .../flowedges/hdfs-1-to-gridfs.properties          |  6 +++
 .../flowEdgeTemplates/hdfsToGridFs/flow.conf       | 10 ++++
 .../hdfsToGridFs/jobs/distcp-hdfs-to-gridfs.job    |  4 ++
 .../distcp-push-hdfs-to-gridfs.template            | 45 +++++++++++++++++
 8 files changed, 163 insertions(+)

diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/GridFSDataNode.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/GridFSDataNode.java
new file mode 100644
index 0000000..7fcf11d
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/datanodes/fs/GridFSDataNode.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flowgraph.datanodes.fs;
+
+import java.net.URI;
+
+import com.google.common.base.Strings;
+import com.typesafe.config.Config;
+
+
+/**
+ * An implementation of an GridFS {@link org.apache.gobblin.service.modules.flowgraph.DataNode}.
+ */
+public class GridFSDataNode extends FileSystemDataNode {
+  public static final String GRIDFS_SCHEME = "gridfs";
+
+  public GridFSDataNode(Config nodeProps) throws DataNodeCreationException {
+    super(nodeProps);
+  }
+
+  /**
+   * @param fsUri FileSystem URI
+   * @return true if the scheme is "adl" and authority is not empty.
+   */
+  @Override
+  public boolean isUriValid(URI fsUri) {
+    String scheme = fsUri.getScheme();
+    //Check that the scheme is "adl"
+    if (!scheme.equals(GRIDFS_SCHEME)) {
+      return false;
+    }
+    //Ensure that the authority is not empty
+    if (Strings.isNullOrEmpty(fsUri.getAuthority())) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String getDefaultDatasetDescriptorPlatform() {
+    return GRIDFS_SCHEME;
+  }
+}
\ No newline at end of file
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
index 14f2078..f548f0a 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompilerTest.java
@@ -550,6 +550,25 @@ public class MultiHopFlowCompilerTest {
 
 
   @Test (dependsOnMethods = "testCompileFlowSingleHop")
+  public void testCompileFlowSingleHopGridFS() throws IOException, URISyntaxException {
+    FlowSpec spec = createFlowSpec("flow/flow6.conf", "HDFS-1", "GRIDFS-1", false, false);
+    Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
+    Assert.assertEquals(jobDag.getNodes().size(), 1);
+    Assert.assertEquals(jobDag.getStartNodes().size(), 1);
+    Assert.assertEquals(jobDag.getEndNodes().size(), 1);
+    Assert.assertEquals(jobDag.getStartNodes().get(0), jobDag.getEndNodes().get(0));
+
+    //Ensure hop is from HDFS-1 to GRIDFS-1 i.e. jobName == "testFlowGroup_testFlowName_DistcpToGridFS_HDFS-1_GRIDFS-1.
+    DagNode<JobExecutionPlan> dagNode = jobDag.getStartNodes().get(0);
+    Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
+    String expectedJobName = Joiner.on(JobExecutionPlan.Factory.JOB_NAME_COMPONENT_SEPARATION_CHAR).
+        join("testFlowGroup", "testFlowName", "DistcpToGridFS", "HDFS-1", "GRIDFS-1", "hdfsToGridFs");
+    String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);
+    Assert.assertTrue(jobName.startsWith(expectedJobName));
+  }
+
+
+  @Test (dependsOnMethods = "testCompileFlowSingleHopGridFS")
   public void testMulticastPath() throws IOException, URISyntaxException {
     FlowSpec spec = createFlowSpec("flow/flow2.conf", "LocalFS-1", "HDFS-3,HDFS-4", false, false);
     Dag<JobExecutionPlan> jobDag = this.specCompiler.compileFlow(spec);
diff --git a/gobblin-service/src/test/resources/flow/flow6.conf b/gobblin-service/src/test/resources/flow/flow6.conf
new file mode 100644
index 0000000..91bed7d
--- /dev/null
+++ b/gobblin-service/src/test/resources/flow/flow6.conf
@@ -0,0 +1,18 @@
+team.name=testTeam
+dataset.name=testDataset
+user.to.proxy=testUser
+#Input dataset - uncompressed and unencrypted
+gobblin.flow.input.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.input.dataset.descriptor.platform=hdfs
+gobblin.flow.input.dataset.descriptor.path=/data/out/${dataset.name}
+gobblin.flow.input.dataset.descriptor.format=avro
+gobblin.flow.input.dataset.descriptor.codec=NONE
+gobblin.flow.input.dataset.descriptor.encrypt.algorithm=NONE
+
+#Output dataset - uncompressed and unencrypted
+gobblin.flow.output.dataset.descriptor.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.output.dataset.descriptor.platform=gridfs
+gobblin.flow.output.dataset.descriptor.path=${gobblin.flow.input.dataset.descriptor.path}
+gobblin.flow.output.dataset.descriptor.format=avro
+gobblin.flow.output.dataset.descriptor.codec=${gobblin.flow.input.dataset.descriptor.codec}
+gobblin.flow.output.dataset.descriptor.encrypt.algorithm=${gobblin.flow.input.dataset.descriptor.encrypt.algorithm}
\ No newline at end of file
diff --git a/gobblin-service/src/test/resources/flowgraph/datanodes/GridFsDataNode-1.properties b/gobblin-service/src/test/resources/flowgraph/datanodes/GridFsDataNode-1.properties
new file mode 100644
index 0000000..bf54dfa
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/datanodes/GridFsDataNode-1.properties
@@ -0,0 +1,3 @@
+data.node.id=GRIDFS-1
+data.node.class=org.apache.gobblin.service.modules.flowgraph.datanodes.fs.GridFSDataNode
+data.node.fs.uri=gridfs://azurecluster.net
\ No newline at end of file
diff --git a/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-gridfs.properties b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-gridfs.properties
new file mode 100644
index 0000000..804607c
--- /dev/null
+++ b/gobblin-service/src/test/resources/flowgraph/flowedges/hdfs-1-to-gridfs.properties
@@ -0,0 +1,6 @@
+flow.edge.source=HDFS-1
+flow.edge.destination=GRIDFS-1
+flow.edge.id=HDFS-1_GRIDFS-1_hdfsToGridFs
+flow.edge.flowTemplateDirUri=FS:///multihop/flowEdgeTemplates/hdfsToGridFs
+flow.edge.specExecutors=azkaban01
+
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToGridFs/flow.conf b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToGridFs/flow.conf
new file mode 100644
index 0000000..0b1009f
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToGridFs/flow.conf
@@ -0,0 +1,10 @@
+gobblin.flow.edge.input.dataset.descriptor.0.class=org.apache.gobblin.service.modules.dataset.FSDatasetDescriptor
+gobblin.flow.edge.input.dataset.descriptor.0.platform=hdfs
+gobblin.flow.edge.input.dataset.descriptor.0.path=/data/out/${dataset.name}
+gobblin.flow.edge.input.dataset.descriptor.0.format=avro
+gobblin.flow.edge.input.dataset.descriptor.0.isRetentionApplied=${flow.applyRetention}
+
+gobblin.flow.edge.output.dataset.descriptor.0.class=${gobblin.flow.edge.input.dataset.descriptor.0.class}
+gobblin.flow.edge.output.dataset.descriptor.0.platform=gridfs
+gobblin.flow.edge.output.dataset.descriptor.0.path=${gobblin.flow.edge.input.dataset.descriptor.0.path}
+gobblin.flow.edge.output.dataset.descriptor.0.format=${gobblin.flow.edge.input.dataset.descriptor.0.format}
\ No newline at end of file
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToGridFs/jobs/distcp-hdfs-to-gridfs.job b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToGridFs/jobs/distcp-hdfs-to-gridfs.job
new file mode 100644
index 0000000..5026b94
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/flowEdgeTemplates/hdfsToGridFs/jobs/distcp-hdfs-to-gridfs.job
@@ -0,0 +1,4 @@
+gobblin.template.uri="FS:///multihop/jobTemplates/distcp-push-hdfs-to-gridfs.template"
+job.class=org.apache.gobblin.azkaban.AzkabanJobLauncher
+launcher.type=MAPREDUCE
+type=hadoopJava
\ No newline at end of file
diff --git a/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-gridfs.template b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-gridfs.template
new file mode 100644
index 0000000..4e783e8
--- /dev/null
+++ b/gobblin-service/src/test/resources/template_catalog/multihop/jobTemplates/distcp-push-hdfs-to-gridfs.template
@@ -0,0 +1,45 @@
+# ====================================================================
+# Job configurations
+# ====================================================================
+job.name=DistcpToGridFS
+
+#team.name and dataset.name to be supplied via flowConfig
+from=/data/out/${dataset.name}
+to=/data/out/${dataset.name}
+
+gobblin.copy.recursive.update=true
+gobblin.copy.recursive.delete=true
+gobblin.copy.recursive.deleteEmptyDirectories=true
+gobblin.trash.skip.trash=true
+
+#Will make the job fail if there's any failure
+gobblin.copy.abortOnSingleDatasetFailure=true
+
+#gobblin.copy.preserved.attributes=p
+
+#Job properties to be resolved from source and dest data node config.
+fs.uri=${source.data.node.fs.uri}
+source.filebased.fs.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+target.filebased.fs.uri=${destination.data.node.fs.uri}
+writer.fs.uri=${target.filebased.fs.uri}
+
+
+# ====================================================================
+# Distcp configurations
+# ====================================================================
+extract.namespace="gobblin.copy"
+
+gobblin.dataset.profile.class="org.apache.gobblin.data.management.copy.CopyableGlobDatasetFinder"
+
+# target location for copy
+data.publisher.final.dir=${to}
+gobblin.dataset.pattern=${from}
+
+data.publisher.type="org.apache.gobblin.data.management.copy.publisher.CopyDataPublisher"
+source.class="org.apache.gobblin.data.management.copy.CopySource"
+writer.builder.class="org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriterBuilder"
+converter.classes="org.apache.gobblin.converter.IdentityConverter"
+
+job.jars="lib/*"
+job.lock.enabled=false
\ No newline at end of file