You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by lg...@apache.org on 2020/12/08 07:09:14 UTC

[incubator-dolphinscheduler] branch 1.3.4-prepare updated: chrry-pick sqoop task in dev to 1.3.4 (#4168)

This is an automated email from the ASF dual-hosted git repository.

lgcareer pushed a commit to branch 1.3.4-prepare
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/1.3.4-prepare by this push:
     new 7720a77  chrry-pick sqoop task in dev to 1.3.4 (#4168)
7720a77 is described below

commit 7720a773cfae7d7a4034036e901bbead4f6a8704
Author: Yelli <ye...@gmail.com>
AuthorDate: Tue Dec 8 15:09:02 2020 +0800

    chrry-pick sqoop task in dev to 1.3.4 (#4168)
    
    * sqoop task optimization
    
    * sqoop front-end optimization
    
    * modify sqoop task UT
    
    * add sqoop task UT to pom
    
    * sqoop task source type or target type is null throw exception
    
    * fix testSqoopTaskTest bug (#3024)
    
    * [FIX-4034][server] fix sqoop import fail (#4036)
    
    * fix #4043, sqoop import query fail
    
    * fix #4043, sqoop task hard code & code style
    
    * add license for SqoopConstants
    
    * add private constructor for SqoopConstants
    
    * fixed sqoop mysql pwd have special character
    
    * fix checkstyle
    
    * fix sqoop task log
    
    * remove unused constants
    
    * [FIX-4034][server] fix sqoop import fail (#4036)
    
    * fix #4043, sqoop import query fail
    
    * fix #4043, sqoop task hard code & code style
    
    * add license for SqoopConstants
    
    * add private constructor for SqoopConstants
    
    * fixed sqoop mysql pwd have special character
    
    * fix checkstyle
    
    * fix sqoop task log
    
    * remove unused constants
    
    * fix sqoop task jdbc string contains special char (#4105)
    
    * split sqoop import hive database and table (#4141)
    
    * modify JSONUtils
    
    Co-authored-by: CalvinKirs <ac...@163.com>
---
 .../apache/dolphinscheduler/api/enums/Status.java  |   2 +-
 .../apache/dolphinscheduler/common/Constants.java  |  14 +
 .../enums/{QueryType.java => SqoopJobType.java}    |  30 +-
 .../enums/{QueryType.java => SqoopQueryType.java}  |  30 +-
 .../common/task/sqoop/SqoopParameters.java         |  99 ++-
 .../task/sqoop/targets/TargetMysqlParameter.java   |   2 +-
 .../master/consumer/TaskPriorityQueueConsumer.java |  33 +-
 .../server/worker/task/sqoop/SqoopConstants.java   |  75 ++
 .../server/worker/task/sqoop/SqoopTask.java        |  50 +-
 .../task/sqoop/generator/CommonGenerator.java      |  62 +-
 .../task/sqoop/generator/ISourceGenerator.java     |   6 +-
 .../task/sqoop/generator/ITargetGenerator.java     |   6 +-
 .../task/sqoop/generator/SqoopJobGenerator.java    |  54 +-
 .../generator/sources/HdfsSourceGenerator.java     |  36 +-
 .../generator/sources/HiveSourceGenerator.java     |  47 +-
 .../generator/sources/MysqlSourceGenerator.java    | 112 +--
 .../generator/targets/HdfsTargetGenerator.java     |  56 +-
 .../generator/targets/HiveTargetGenerator.java     |  69 +-
 .../generator/targets/MysqlTargetGenerator.java    |  88 +-
 .../consumer/TaskPriorityQueueConsumerTest.java    |   4 +-
 .../server/worker/task/sqoop/SqoopTaskTest.java    | 180 ++--
 .../_source/formModel/tasks/_source/datasource.vue |   2 +-
 .../pages/dag/_source/formModel/tasks/sqoop.vue    | 962 +++++++++++++--------
 .../src/js/module/i18n/locale/en_US.js             |   7 +
 .../src/js/module/i18n/locale/zh_CN.js             |   7 +
 pom.xml                                            |   1 +
 26 files changed, 1334 insertions(+), 700 deletions(-)

diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index f5390e9..fd6c7af 100644
--- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -217,7 +217,7 @@ public enum Status {
     DATA_IS_NOT_VALID(50017,"data {0} not valid", "数据[{0}]无效"),
     DATA_IS_NULL(50018,"data {0} is null", "数据[{0}]不能为空"),
     PROCESS_NODE_HAS_CYCLE(50019,"process node has cycle", "流程节点间存在循环依赖"),
-    PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node %s parameter invalid", "流程节点[%s]参数无效"),
+    PROCESS_NODE_S_PARAMETER_INVALID(50020,"process node {0} parameter invalid", "流程节点[{0}]参数无效"),
     PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
     DELETE_PROCESS_DEFINE_BY_ID_ERROR(50022,"delete process definition by id error", "删除工作流定义错误"),
     SCHEDULE_CRON_STATE_ONLINE(50023,"the status of schedule {0} is already on line", "调度配置[{0}]已上线"),
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index a35bc18..8af62a4 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -216,6 +216,11 @@ public final class Constants {
     public static final String COLON = ":";
 
     /**
+     * SPACE " "
+     */
+    public static final String SPACE = " ";
+
+    /**
      * SINGLE_SLASH /
      */
     public static final String SINGLE_SLASH = "/";
@@ -226,6 +231,15 @@ public final class Constants {
     public static final String DOUBLE_SLASH = "//";
 
     /**
+     * SINGLE_QUOTES "'"
+     */
+    public static final String SINGLE_QUOTES = "'";
+    /**
+     * DOUBLE_QUOTES "\""
+     */
+    public static final String DOUBLE_QUOTES = "\"";
+
+    /**
      * SEMICOLON ;
      */
     public static final String SEMICOLON = ";";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java
similarity index 67%
copy from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java
copy to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java
index 13820b4..f1fde27 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopJobType.java
@@ -16,18 +16,26 @@
  */
 package org.apache.dolphinscheduler.common.enums;
 
-public enum QueryType {
+import com.baomidou.mybatisplus.annotation.EnumValue;
 
-    FORM,
-    SQL;
+public enum  SqoopJobType {
+    CUSTOM(0, "CUSTOM"),
+    TEMPLATE(1, "TEMPLATE");
 
-    public static QueryType getEnum(int value){
-        for (QueryType e:QueryType.values()) {
-            if(e.ordinal() == value) {
-                return e;
-            }
-        }
-        //For values out of enum scope
-        return null;
+    SqoopJobType(int code, String descp){
+        this.code = code;
+        this.descp = descp;
+    }
+
+    @EnumValue
+    private final int code;
+    private final String descp;
+
+    public int getCode() {
+        return code;
+    }
+
+    public String getDescp() {
+        return descp;
     }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
similarity index 69%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java
rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
index 13820b4..4d279f5 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
@@ -14,20 +14,28 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.common.enums;
 
-public enum QueryType {
+public enum SqoopQueryType {
+
+    FORM(0, "SQOOP_QUERY_FORM"),
+    SQL(1, "SQOOP_QUERY_SQL");
+
+    private final Integer code;
+
+    private final String desc;
 
-    FORM,
-    SQL;
+    SqoopQueryType(Integer code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public Integer getCode() {
+        return code;
+    }
 
-    public static QueryType getEnum(int value){
-        for (QueryType e:QueryType.values()) {
-            if(e.ordinal() == value) {
-                return e;
-            }
-        }
-        //For values out of enum scope
-        return null;
+    public String getDesc() {
+        return desc;
     }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
index 7f02f42..8b566a8 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/SqoopParameters.java
@@ -16,6 +16,8 @@
  */
 package org.apache.dolphinscheduler.common.task.sqoop;
 
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.process.ResourceInfo;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
@@ -29,6 +31,23 @@ import java.util.List;
 public class SqoopParameters  extends AbstractParameters {
 
     /**
+     * sqoop job type:
+     * CUSTOM - custom sqoop job
+     * TEMPLATE - sqoop template job
+     */
+    private String jobType;
+
+    /**
+     * customJob eq 1, use customShell
+     */
+    private String customShell;
+
+    /**
+     * sqoop job name - map-reduce job name
+     */
+    private String jobName;
+
+    /**
      * model type
      */
     private String modelType;
@@ -53,6 +72,16 @@ public class SqoopParameters  extends AbstractParameters {
      */
     private String targetParams;
 
+    /**
+     * hadoop custom param for sqoop job
+     */
+    private List<Property> hadoopCustomParams;
+
+    /**
+     * sqoop advanced param
+     */
+    private List<Property> sqoopAdvancedParams;
+
     public String getModelType() {
         return modelType;
     }
@@ -101,18 +130,74 @@ public class SqoopParameters  extends AbstractParameters {
         this.targetParams = targetParams;
     }
 
+    public String getJobType() {
+        return jobType;
+    }
+
+    public void setJobType(String jobType) {
+        this.jobType = jobType;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public String getCustomShell() {
+        return customShell;
+    }
+
+    public void setCustomShell(String customShell) {
+        this.customShell = customShell;
+    }
+
+    public List<Property> getHadoopCustomParams() {
+        return hadoopCustomParams;
+    }
+
+    public void setHadoopCustomParams(List<Property> hadoopCustomParams) {
+        this.hadoopCustomParams = hadoopCustomParams;
+    }
+
+    public List<Property> getSqoopAdvancedParams() {
+        return sqoopAdvancedParams;
+    }
+
+    public void setSqoopAdvancedParams(List<Property> sqoopAdvancedParams) {
+        this.sqoopAdvancedParams = sqoopAdvancedParams;
+    }
+
     @Override
     public boolean checkParameters() {
-        return StringUtils.isNotEmpty(modelType)&&
-                concurrency != 0 &&
-                StringUtils.isNotEmpty(sourceType)&&
-                StringUtils.isNotEmpty(targetType)&&
-                StringUtils.isNotEmpty(sourceParams)&&
-                StringUtils.isNotEmpty(targetParams);
+
+        boolean sqoopParamsCheck = false;
+
+        if (StringUtils.isEmpty(jobType)) {
+            return sqoopParamsCheck;
+        }
+
+        if (SqoopJobType.TEMPLATE.getDescp().equals(jobType)) {
+            sqoopParamsCheck = StringUtils.isEmpty(customShell) &&
+                    StringUtils.isNotEmpty(modelType) &&
+                    StringUtils.isNotEmpty(jobName) &&
+                    concurrency != 0 &&
+                    StringUtils.isNotEmpty(sourceType) &&
+                    StringUtils.isNotEmpty(targetType) &&
+                    StringUtils.isNotEmpty(sourceParams) &&
+                    StringUtils.isNotEmpty(targetParams);
+        } else if (SqoopJobType.CUSTOM.getDescp().equals(jobType)) {
+            sqoopParamsCheck = StringUtils.isNotEmpty(customShell) &&
+                    StringUtils.isEmpty(jobName);
+        }
+
+        return sqoopParamsCheck;
     }
 
     @Override
     public List<ResourceInfo> getResourceFilesList() {
-       return new ArrayList<>();
+        return new ArrayList<>();
     }
 }
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java
index 47126ae..86931cb 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/sqoop/targets/TargetMysqlParameter.java
@@ -106,7 +106,7 @@ public class TargetMysqlParameter {
         this.preQuery = preQuery;
     }
 
-    public boolean isUpdate() {
+    public boolean getIsUpdate() {
         return isUpdate;
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
index 1a3bf6d..eef59e6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumer.java
@@ -21,6 +21,7 @@ import com.alibaba.fastjson.JSONObject;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
 import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.enums.UdfType;
 import org.apache.dolphinscheduler.common.model.TaskNode;
@@ -33,6 +34,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.thread.Stopper;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.*;
 import org.apache.dolphinscheduler.dao.entity.*;
 import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
@@ -275,29 +277,32 @@ public class TaskPriorityQueueConsumer extends Thread{
 
 
     /**
-     * set datax task relation
+     * set sqoop task relation
      * @param sqoopTaskExecutionContext sqoopTaskExecutionContext
      * @param taskNode taskNode
      */
     private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskNode taskNode) {
         SqoopParameters sqoopParameters = JSONObject.parseObject(taskNode.getParams(), SqoopParameters.class);
 
-        SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
-        TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
+        // sqoop job type is template set task relation
+        if (sqoopParameters.getJobType().equals(SqoopJobType.TEMPLATE.getDescp())) {
+            SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
+            TargetMysqlParameter targetMysqlParameter = JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
 
-        DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
-        DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
+            DataSource dataSource = processService.findDataSourceById(sourceMysqlParameter.getSrcDatasource());
+            DataSource dataTarget = processService.findDataSourceById(targetMysqlParameter.getTargetDatasource());
 
-        if (dataSource != null){
-            sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
-            sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
-            sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
-        }
+            if (dataSource != null){
+                sqoopTaskExecutionContext.setDataSourceId(dataSource.getId());
+                sqoopTaskExecutionContext.setSourcetype(dataSource.getType().getCode());
+                sqoopTaskExecutionContext.setSourceConnectionParams(dataSource.getConnectionParams());
+            }
 
-        if (dataTarget != null){
-            sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
-            sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
-            sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            if (dataTarget != null){
+                sqoopTaskExecutionContext.setDataTargetId(dataTarget.getId());
+                sqoopTaskExecutionContext.setTargetType(dataTarget.getType().getCode());
+                sqoopTaskExecutionContext.setTargetConnectionParams(dataTarget.getConnectionParams());
+            }
         }
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java
new file mode 100644
index 0000000..772807b
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task.sqoop;
+
+public final class SqoopConstants {
+
+    private SqoopConstants() {
+    }
+
+    //sqoop general param
+    public static final String SQOOP = "sqoop";
+    public static final String SQOOP_MR_JOB_NAME = "mapred.job.name";
+    public static final String SQOOP_PARALLELISM = "-m";
+    public static final String FIELDS_TERMINATED_BY = "--fields-terminated-by";
+    public static final String LINES_TERMINATED_BY = "--lines-terminated-by";
+    public static final String FIELD_NULL_PLACEHOLDER = "--null-non-string 'NULL' --null-string 'NULL'";
+
+    //sqoop db
+    public static final String DB_CONNECT = "--connect";
+    public static final String DB_USERNAME = "--username";
+    public static final String DB_PWD = "--password";
+    public static final String TABLE = "--table";
+    public static final String COLUMNS = "--columns";
+    public static final String QUERY_WHERE = "where";
+    public static final String QUERY = "--query";
+    public static final String QUERY_CONDITION = "AND \\$CONDITIONS";
+    public static final String QUERY_WITHOUT_CONDITION = "WHERE \\$CONDITIONS";
+    public static final String MAP_COLUMN_HIVE = "--map-column-hive";
+    public static final String MAP_COLUMN_JAVA = "--map-column-java";
+
+
+    //sqoop hive source
+    public static final String HCATALOG_DATABASE = "--hcatalog-database";
+    public static final String HCATALOG_TABLE = "--hcatalog-table";
+    public static final String HCATALOG_PARTITION_KEYS = "--hcatalog-partition-keys";
+    public static final String HCATALOG_PARTITION_VALUES = "--hcatalog-partition-values";
+
+    //sqoop hdfs
+    public static final String HDFS_EXPORT_DIR = "--export-dir";
+    public static final String TARGET_DIR = "--target-dir";
+    public static final String COMPRESSION_CODEC = "--compression-codec";
+
+    //sqoop hive
+    public static final String HIVE_IMPORT = "--hive-import";
+    public static final String HIVE_DATABASE = "--hive-database";
+    public static final String HIVE_TABLE = "--hive-table";
+    public static final String CREATE_HIVE_TABLE = "--create-hive-table";
+    public static final String HIVE_DROP_IMPORT_DELIMS = "--hive-drop-import-delims";
+    public static final String HIVE_OVERWRITE = "--hive-overwrite";
+    public static final String DELETE_TARGET_DIR = "--delete-target-dir";
+    public static final String HIVE_DELIMS_REPLACEMENT = "--hive-delims-replacement";
+    public static final String HIVE_PARTITION_KEY = "--hive-partition-key";
+    public static final String HIVE_PARTITION_VALUE = "--hive-partition-value";
+
+    //sqoop update model
+    public static final String UPDATE_KEY = "--update-key";
+    public static final String UPDATE_MODE = "--update-mode";
+
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index 9f54d08..00d94f0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -14,63 +14,73 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop;
 
-import com.alibaba.fastjson.JSON;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
-import org.slf4j.Logger;
+
 import java.util.Map;
 
+import org.slf4j.Logger;
+
 /**
  * sqoop task extends the shell task
  */
 public class SqoopTask extends AbstractYarnTask {
 
+    /**
+     * sqoop task params
+     */
     private SqoopParameters sqoopParameters;
 
     /**
      * taskExecutionContext
      */
-    private TaskExecutionContext taskExecutionContext;
+    private final TaskExecutionContext sqoopTaskExecutionContext;
 
-    public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger){
-        super(taskExecutionContext,logger);
-        this.taskExecutionContext = taskExecutionContext;
+    public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger) {
+        super(taskExecutionContext, logger);
+        this.sqoopTaskExecutionContext = taskExecutionContext;
     }
 
     @Override
-    public void init() throws Exception {
-        logger.info("sqoop task params {}", taskExecutionContext.getTaskParams());
+    public void init() {
+        logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams());
         sqoopParameters =
-                JSON.parseObject(taskExecutionContext.getTaskParams(),SqoopParameters.class);
-        if (!sqoopParameters.checkParameters()) {
-            throw new RuntimeException("sqoop task params is not valid");
+            JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class);
+        //check sqoop task params
+        if (null == sqoopParameters) {
+            throw new IllegalArgumentException("Sqoop Task params is null");
         }
 
+        if (!sqoopParameters.checkParameters()) {
+            throw new IllegalArgumentException("Sqoop Task params check fail");
+        }
     }
 
     @Override
-    protected String buildCommand() throws Exception {
+    protected String buildCommand() {
         //get sqoop scripts
         SqoopJobGenerator generator = new SqoopJobGenerator();
-        String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext);
+        String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
 
-        Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
-                taskExecutionContext.getDefinedParams(),
-                sqoopParameters.getLocalParametersMap(),
-                CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
-                taskExecutionContext.getScheduleTime());
+        Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
+            sqoopTaskExecutionContext.getDefinedParams(),
+            sqoopParameters.getLocalParametersMap(),
+            CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
+            sqoopTaskExecutionContext.getScheduleTime());
 
-        if(paramsMap != null){
-            String resultScripts = ParameterUtils.convertParameterPlaceholders(script,  ParamUtils.convert(paramsMap));
+        if (paramsMap != null) {
+            String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
             logger.info("sqoop script: {}", resultScripts);
             return resultScripts;
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
index 4944bac..e3e7c9a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
@@ -14,9 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
 
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
+import org.apache.dolphinscheduler.common.utils.CollectionUtils;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
+
+import java.util.List;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,21 +33,53 @@ import org.slf4j.LoggerFactory;
  */
 public class CommonGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(CommonGenerator.class);
 
     public String generate(SqoopParameters sqoopParameters) {
-        StringBuilder result = new StringBuilder();
-        try{
-            result.append("sqoop ")
-                    .append(sqoopParameters.getModelType());
-            if(sqoopParameters.getConcurrency() >0){
-                result.append(" -m ")
-                        .append(sqoopParameters.getConcurrency());
+
+        StringBuilder commonSb = new StringBuilder();
+
+        try {
+            //sqoop task model
+            commonSb.append(SqoopConstants.SQOOP)
+                .append(Constants.SPACE)
+                .append(sqoopParameters.getModelType());
+
+            //sqoop map-reduce job name
+            commonSb.append(Constants.SPACE).append(Constants.D).append(Constants.SPACE)
+                .append(String.format("%s%s%s", SqoopConstants.SQOOP_MR_JOB_NAME,
+                    Constants.EQUAL_SIGN, sqoopParameters.getJobName()));
+
+            //hadoop custom param
+            List<Property> hadoopCustomParams = sqoopParameters.getHadoopCustomParams();
+            if (CollectionUtils.isNotEmpty(hadoopCustomParams)) {
+                for (Property hadoopCustomParam : hadoopCustomParams) {
+                    String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(),
+                        Constants.EQUAL_SIGN, hadoopCustomParam.getValue());
+
+                    commonSb.append(Constants.SPACE).append(Constants.D)
+                        .append(Constants.SPACE).append(hadoopCustomParamStr);
+                }
+            }
+
+            //sqoop custom params
+            List<Property> sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams();
+            if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) {
+                for (Property sqoopAdvancedParam : sqoopAdvancedParams) {
+                    commonSb.append(Constants.SPACE).append(sqoopAdvancedParam.getProp())
+                        .append(Constants.SPACE).append(sqoopAdvancedParam.getValue());
+                }
+            }
+
+            //sqoop parallelism
+            if (sqoopParameters.getConcurrency() > 0) {
+                commonSb.append(Constants.SPACE).append(SqoopConstants.SQOOP_PARALLELISM)
+                    .append(Constants.SPACE).append(sqoopParameters.getConcurrency());
             }
-        }catch (Exception e){
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop task general param build failed: [%s]", e.getMessage()));
         }
 
-        return result.toString();
+        return commonSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
index 841654b..e6a9576 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
 
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@@ -26,9 +27,10 @@ public interface ISourceGenerator {
 
     /**
      * generate the source script
-     * @param sqoopParameters sqoopParameters
+     *
+     * @param sqoopParameters      sqoopParameters
      * @param taskExecutionContext taskExecutionContext
      * @return source script
      */
-    String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
+    String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext);
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
index 7bdaf49..eb355a7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
 
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@@ -26,9 +27,10 @@ public interface ITargetGenerator {
 
     /**
      * generate the target script
-     * @param sqoopParameters sqoopParameters
+     *
+     * @param sqoopParameters      sqoopParameters
      * @param taskExecutionContext taskExecutionContext
      * @return target script
      */
-    String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
+    String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext);
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
index 4e9cb84..9feaffa 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
@@ -14,8 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
 
+import org.apache.dolphinscheduler.common.enums.SqoopJobType;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources.HdfsSourceGenerator;
@@ -45,40 +47,51 @@ public class SqoopJobGenerator {
     /**
      * common script generator
      */
-    private CommonGenerator commonGenerator;
+    private final CommonGenerator commonGenerator;
 
-    public SqoopJobGenerator(){
+    public SqoopJobGenerator() {
         commonGenerator = new CommonGenerator();
     }
 
-    private void createSqoopJobGenerator(String sourceType,String targetType){
+    private void createSqoopJobGenerator(String sourceType, String targetType) {
         sourceGenerator = createSourceGenerator(sourceType);
         targetGenerator = createTargetGenerator(targetType);
     }
 
     /**
      * get the final sqoop scripts
-     * @param sqoopParameters
-     * @return
+     *
+     * @param sqoopParameters sqoop params
+     * @return sqoop scripts
      */
-    public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){
-        createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType());
-        if(sourceGenerator == null || targetGenerator == null){
-            return null;
+    public String generateSqoopJob(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+        String sqoopScripts = "";
+
+        if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) {
+            createSqoopJobGenerator(sqoopParameters.getSourceType(), sqoopParameters.getTargetType());
+            if (sourceGenerator == null || targetGenerator == null) {
+                throw new RuntimeException("sqoop task source type or target type is null");
+            }
+
+            sqoopScripts = String.format("%s%s%s", commonGenerator.generate(sqoopParameters),
+                sourceGenerator.generate(sqoopParameters, taskExecutionContext),
+                targetGenerator.generate(sqoopParameters, taskExecutionContext));
+        } else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) {
+            sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n");
         }
 
-        return commonGenerator.generate(sqoopParameters)
-                + sourceGenerator.generate(sqoopParameters,taskExecutionContext)
-                + targetGenerator.generate(sqoopParameters,taskExecutionContext);
+        return sqoopScripts;
     }
 
     /**
      * get the source generator
-     * @param sourceType
-     * @return
+     *
+     * @param sourceType sqoop source type
+     * @return sqoop source generator
      */
-    private ISourceGenerator createSourceGenerator(String sourceType){
-        switch (sourceType){
+    private ISourceGenerator createSourceGenerator(String sourceType) {
+        switch (sourceType) {
             case MYSQL:
                 return new MysqlSourceGenerator();
             case HIVE:
@@ -92,11 +105,12 @@ public class SqoopJobGenerator {
 
     /**
      * get the target generator
-     * @param targetType
-     * @return
+     *
+     * @param targetType sqoop target type
+     * @return sqoop target generator
      */
-    private ITargetGenerator createTargetGenerator(String targetType){
-        switch (targetType){
+    private ITargetGenerator createTargetGenerator(String targetType) {
+        switch (targetType) {
             case MYSQL:
                 return new MysqlTargetGenerator();
             case HIVE:
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
index 41e5668..549d5db 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,28 +34,30 @@ import org.slf4j.LoggerFactory;
  */
 public class HdfsSourceGenerator implements ISourceGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class);
 
     @Override
-    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
-        StringBuilder result = new StringBuilder();
-        try{
+    public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+        StringBuilder hdfsSourceSb = new StringBuilder();
+
+        try {
             SourceHdfsParameter sourceHdfsParameter
-                    = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class);
-
-            if(sourceHdfsParameter != null){
-                if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){
-                    result.append(" --export-dir ")
-                            .append(sourceHdfsParameter.getExportDir());
-                }else{
-                    throw new Exception("--export-dir is null");
+                = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHdfsParameter.class);
+
+            if (null != sourceHdfsParameter) {
+                if (StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())) {
+                    hdfsSourceSb.append(Constants.SPACE).append(SqoopConstants.HDFS_EXPORT_DIR)
+                        .append(Constants.SPACE).append(sourceHdfsParameter.getExportDir());
+                } else {
+                    throw new IllegalArgumentException("Sqoop hdfs export dir is null");
                 }
 
             }
-        }catch (Exception e){
-            logger.error("get hdfs source failed",e);
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop hdfs source parmas build failed: [%s]", e.getMessage()));
         }
 
-        return result.toString();
+        return hdfsSourceSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
index ea12616..3229dca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,33 +34,40 @@ import org.slf4j.LoggerFactory;
  */
 public class HiveSourceGenerator implements ISourceGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class);
 
     @Override
-    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
-        StringBuilder sb = new StringBuilder();
-        try{
+    public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+        StringBuilder hiveSourceSb = new StringBuilder();
+
+        try {
             SourceHiveParameter sourceHiveParameter
-                    = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class);
-            if(sourceHiveParameter != null){
-                if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){
-                    sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase());
+                = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHiveParameter.class);
+
+            if (null != sourceHiveParameter) {
+                if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())) {
+                    hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_DATABASE)
+                        .append(Constants.SPACE).append(sourceHiveParameter.getHiveDatabase());
                 }
 
-                if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){
-                    sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable());
+                if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())) {
+                    hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_TABLE)
+                        .append(Constants.SPACE).append(sourceHiveParameter.getHiveTable());
                 }
 
-                if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&&
-                        StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){
-                    sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey())
-                            .append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue());
+                if (StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())
+                    && StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())) {
+                    hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_KEYS)
+                        .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionKey())
+                        .append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_VALUES)
+                        .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionValue());
                 }
             }
-        }catch (Exception e){
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop hive source params build failed: [%s]", e.getMessage()));
         }
 
-        return sb.toString();
+        return hiveSourceSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
index 8cd5357..58e508a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
@@ -14,106 +14,118 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.enums.QueryType;
+import org.apache.dolphinscheduler.common.enums.SqoopQueryType;
 import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
 import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * mysql source generator
  */
 public class MysqlSourceGenerator implements ISourceGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class);
 
     @Override
-    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
-        StringBuilder result = new StringBuilder();
-        try {
-            SourceMysqlParameter sourceMysqlParameter
-                    = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class);
+    public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+        StringBuilder mysqlSourceSb = new StringBuilder();
 
+        try {
+            SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
             SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
 
-            if(sourceMysqlParameter != null){
+            if (null != sourceMysqlParameter) {
                 BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()),
-                        sqoopTaskExecutionContext.getSourceConnectionParams());
-                if(baseDataSource != null){
-                    result.append(" --connect ")
-                            .append(baseDataSource.getJdbcUrl())
-                            .append(" --username ")
-                            .append(baseDataSource.getUser())
-                            .append(" --password ")
-                            .append(baseDataSource.getPassword());
-
-                    if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){
-                        if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){
-                            result.append(" --table ").append(sourceMysqlParameter.getSrcTable());
+                    sqoopTaskExecutionContext.getSourceConnectionParams());
+
+                if (null != baseDataSource) {
+
+                    mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT)
+                        .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getJdbcUrl()).append(Constants.DOUBLE_QUOTES)
+                        .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME)
+                        .append(Constants.SPACE).append(baseDataSource.getUser())
+                        .append(Constants.SPACE).append(SqoopConstants.DB_PWD)
+                        .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES);
+
+                    //sqoop table & sql query
+                    if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.FORM.getCode()) {
+                        if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())) {
+                            mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.TABLE)
+                                .append(Constants.SPACE).append(sourceMysqlParameter.getSrcTable());
                         }
 
-                        if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){
-                            result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns());
+                        if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())) {
+                            mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS)
+                                .append(Constants.SPACE).append(sourceMysqlParameter.getSrcColumns());
                         }
+                    } else if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.SQL.getCode()
+                        && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())) {
 
-                    }else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal()
-                            && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){
                         String srcQuery = sourceMysqlParameter.getSrcQuerySql();
-                        if(srcQuery.toLowerCase().contains("where")){
-                            srcQuery += " AND "+"$CONDITIONS";
-                        }else{
-                            srcQuery += " WHERE $CONDITIONS";
-                        }
-                        result.append(" --query \'").append(srcQuery).append("\'");
+                        mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY)
+                            .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(srcQuery);
 
+                        if (srcQuery.toLowerCase().contains(SqoopConstants.QUERY_WHERE)) {
+                            mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_CONDITION).append(Constants.DOUBLE_QUOTES);
+                        } else {
+                            mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_WITHOUT_CONDITION).append(Constants.DOUBLE_QUOTES);
+                        }
                     }
 
-                    List<Property>  mapColumnHive = sourceMysqlParameter.getMapColumnHive();
+                    //sqoop hive map column
+                    List<Property> mapColumnHive = sourceMysqlParameter.getMapColumnHive();
 
-                    if(mapColumnHive != null && !mapColumnHive.isEmpty()){
+                    if (null != mapColumnHive && !mapColumnHive.isEmpty()) {
                         StringBuilder columnMap = new StringBuilder();
-                        for(Property item:mapColumnHive){
-                            columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
+                        for (Property item : mapColumnHive) {
+                            columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
                         }
 
-                        if(StringUtils.isNotEmpty(columnMap.toString())){
-                            result.append(" --map-column-hive ")
-                                    .append(columnMap.substring(0,columnMap.length() - 1));
+                        if (StringUtils.isNotEmpty(columnMap.toString())) {
+                            mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_HIVE)
+                                .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
                         }
                     }
 
-                    List<Property>  mapColumnJava = sourceMysqlParameter.getMapColumnJava();
+                    //sqoop map column java
+                    List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
 
-                    if(mapColumnJava != null && !mapColumnJava.isEmpty()){
+                    if (null != mapColumnJava && !mapColumnJava.isEmpty()) {
                         StringBuilder columnMap = new StringBuilder();
-                        for(Property item:mapColumnJava){
-                            columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
+                        for (Property item : mapColumnJava) {
+                            columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
                         }
 
-                        if(StringUtils.isNotEmpty(columnMap.toString())){
-                            result.append(" --map-column-java ")
-                                    .append(columnMap.substring(0,columnMap.length() - 1));
+                        if (StringUtils.isNotEmpty(columnMap.toString())) {
+                            mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA)
+                                .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
                         }
                     }
                 }
             }
-        }catch (Exception e){
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop task mysql source params build failed: [%s]", e.getMessage()));
         }
 
-        return result.toString();
+        return mysqlSourceSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
index 64ea75e..3ea3254 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,47 +34,53 @@ import org.slf4j.LoggerFactory;
  */
 public class HdfsTargetGenerator implements ITargetGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class);
 
     @Override
-    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
-        StringBuilder result = new StringBuilder();
-        try{
+    public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+        StringBuilder hdfsTargetSb = new StringBuilder();
+
+        try {
             TargetHdfsParameter targetHdfsParameter =
-                    JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class);
+                JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHdfsParameter.class);
 
-            if(targetHdfsParameter != null){
+            if (null != targetHdfsParameter) {
 
-                if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){
-                    result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath());
+                if (StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())) {
+                    hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.TARGET_DIR)
+                        .append(Constants.SPACE).append(targetHdfsParameter.getTargetPath());
                 }
 
-                if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){
-                    result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec());
+                if (StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())) {
+                    hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.COMPRESSION_CODEC)
+                        .append(Constants.SPACE).append(targetHdfsParameter.getCompressionCodec());
                 }
 
-                if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){
-                    result.append(" ").append(targetHdfsParameter.getFileType());
+                if (StringUtils.isNotEmpty(targetHdfsParameter.getFileType())) {
+                    hdfsTargetSb.append(Constants.SPACE).append(targetHdfsParameter.getFileType());
                 }
 
-                if(targetHdfsParameter.isDeleteTargetDir()){
-                    result.append(" --delete-target-dir");
+                if (targetHdfsParameter.isDeleteTargetDir()) {
+                    hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
                 }
 
-                if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){
-                    result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'");
+                if (StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())) {
+                    hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY)
+                        .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES);
                 }
 
-                if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){
-                    result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'");
+                if (StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())) {
+                    hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY)
+                        .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES);
                 }
 
-                result.append(" --null-non-string 'NULL' --null-string 'NULL'");
+                hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELD_NULL_PLACEHOLDER);
             }
-        }catch(Exception e){
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop hdfs target params build failed: [%s]", e.getMessage()));
         }
 
-        return result.toString();
+        return hdfsTargetSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
index dc5440b..769fc62 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
@@ -14,14 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,57 +34,58 @@ import org.slf4j.LoggerFactory;
  */
 public class HiveTargetGenerator implements ITargetGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class);
 
     @Override
-    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
+    public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
 
-        StringBuilder result = new StringBuilder();
+        StringBuilder hiveTargetSb = new StringBuilder();
 
-        try{
+        try {
             TargetHiveParameter targetHiveParameter =
-                    JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class);
-            if(targetHiveParameter != null){
-
-                result.append(" --hive-import ");
+                JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHiveParameter.class);
+            if (null != targetHiveParameter) {
+                hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_IMPORT);
 
-                if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&&
-                        StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){
-                    result.append(" --hive-table ")
-                            .append(targetHiveParameter.getHiveDatabase())
-                            .append(".")
-                            .append(targetHiveParameter.getHiveTable());
+                if (StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())
+                    && StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())) {
+                    hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DATABASE)
+                        .append(Constants.SPACE).append(targetHiveParameter.getHiveDatabase())
+                        .append(Constants.SPACE).append(SqoopConstants.HIVE_TABLE)
+                        .append(Constants.SPACE).append(targetHiveParameter.getHiveTable());
                 }
 
-                if(targetHiveParameter.isCreateHiveTable()){
-                    result.append(" --create-hive-table");
+                if (targetHiveParameter.isCreateHiveTable()) {
+                    hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.CREATE_HIVE_TABLE);
                 }
 
-                if(targetHiveParameter.isDropDelimiter()){
-                    result.append(" --hive-drop-import-delims");
+                if (targetHiveParameter.isDropDelimiter()) {
+                    hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DROP_IMPORT_DELIMS);
                 }
 
-                if(targetHiveParameter.isHiveOverWrite()){
-                    result.append(" --hive-overwrite -delete-target-dir");
+                if (targetHiveParameter.isHiveOverWrite()) {
+                    hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_OVERWRITE)
+                        .append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
                 }
 
-                if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){
-                    result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter());
+                if (StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())) {
+                    hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DELIMS_REPLACEMENT)
+                        .append(Constants.SPACE).append(targetHiveParameter.getReplaceDelimiter());
                 }
 
-                if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&&
-                        StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){
-                    result.append(" --hive-partition-key ")
-                            .append(targetHiveParameter.getHivePartitionKey())
-                            .append(" --hive-partition-value ")
-                            .append(targetHiveParameter.getHivePartitionValue());
+                if (StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())
+                    && StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())) {
+                    hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_KEY)
+                        .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionKey())
+                        .append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_VALUE)
+                        .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionValue());
                 }
 
             }
-        }catch(Exception e){
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop hive target params build failed: [%s]", e.getMessage()));
         }
 
-        return result.toString();
+        return hiveTargetSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
index aed8b9e..4384c96 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
@@ -14,21 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
 import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
 import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,59 +38,74 @@ import org.slf4j.LoggerFactory;
  */
 public class MysqlTargetGenerator implements ITargetGenerator {
 
-    private Logger logger = LoggerFactory.getLogger(getClass());
+    private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class);
 
     @Override
-    public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
+    public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
 
-        StringBuilder result = new StringBuilder();
-        try{
+        StringBuilder mysqlTargetSb = new StringBuilder();
 
+        try {
             TargetMysqlParameter targetMysqlParameter =
-                    JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class);
+                JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
 
             SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
 
-            if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){
+            if (null != targetMysqlParameter && targetMysqlParameter.getTargetDatasource() != 0) {
 
                 // get datasource
                 BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()),
-                        sqoopTaskExecutionContext.getTargetConnectionParams());
-
-                if(baseDataSource != null){
-                    result.append(" --connect ")
-                            .append(baseDataSource.getJdbcUrl())
-                            .append(" --username ")
-                            .append(baseDataSource.getUser())
-                            .append(" --password ")
-                            .append(baseDataSource.getPassword())
-                            .append(" --table ")
-                            .append(targetMysqlParameter.getTargetTable());
-
-                    if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){
-                        result.append(" --columns ").append(targetMysqlParameter.getTargetColumns());
+                    sqoopTaskExecutionContext.getTargetConnectionParams());
+
+                if (null != baseDataSource) {
+
+                    mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT)
+                        .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getJdbcUrl()).append(Constants.DOUBLE_QUOTES)
+                        .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME)
+                        .append(Constants.SPACE).append(baseDataSource.getUser())
+                        .append(Constants.SPACE).append(SqoopConstants.DB_PWD)
+                        .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES)
+                        .append(Constants.SPACE).append(SqoopConstants.TABLE)
+                        .append(Constants.SPACE).append(targetMysqlParameter.getTargetTable());
+
+                    if (StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())) {
+                        mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS)
+                            .append(Constants.SPACE).append(targetMysqlParameter.getTargetColumns());
                     }
 
-                    if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){
-                        result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'");
+                    if (StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())) {
+                        mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY);
+                        if (targetMysqlParameter.getFieldsTerminated().contains("'")) {
+                            mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getFieldsTerminated());
+
+                        } else {
+                            mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES);
+                        }
                     }
 
-                    if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){
-                        result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'");
+                    if (StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())) {
+                        mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY);
+                        if (targetMysqlParameter.getLinesTerminated().contains(Constants.SINGLE_QUOTES)) {
+                            mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getLinesTerminated());
+                        } else {
+                            mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES);
+                        }
                     }
 
-                    if(targetMysqlParameter.isUpdate()
-                            && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())
-                            && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){
-                        result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey())
-                              .append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode());
+                    if (targetMysqlParameter.getIsUpdate()
+                        && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())
+                        && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())) {
+                        mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.UPDATE_KEY)
+                            .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateKey())
+                            .append(Constants.SPACE).append(SqoopConstants.UPDATE_MODE)
+                            .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateMode());
                     }
                 }
             }
-        }catch (Exception e){
-            logger.error(e.getMessage());
+        } catch (Exception e) {
+            logger.error(String.format("Sqoop mysql target params build failed: [%s]", e.getMessage()));
         }
 
-        return result.toString();
+        return mysqlTargetSb.toString();
     }
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
index b247dc4..a95acf8 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/consumer/TaskPriorityQueueConsumerTest.java
@@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.server.zk.SpringZKServer;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.queue.TaskPriorityQueue;
-import org.apache.dolphinscheduler.service.queue.TaskPriorityQueueImpl;
 import org.apache.dolphinscheduler.service.zk.ZookeeperCachedOperator;
 import org.apache.dolphinscheduler.service.zk.ZookeeperConfig;
 import org.junit.Before;
@@ -233,8 +232,7 @@ public class TaskPriorityQueueConsumerTest {
         dataSource.setConnectionParams("{\"address\":\"jdbc:mysql://192.168.221.185:3306\",\"database\":\"dolphinscheduler_qiaozhanwei\",\"jdbcUrl\":\"jdbc:mysql://192.168.221.185:3306/dolphinscheduler_qiaozhanwei\",\"user\":\"root\",\"password\":\"root@123\"}");
         dataSource.setCreateTime(new Date());
         dataSource.setUpdateTime(new Date());
-
-        Mockito.when(processService.findDataSourceById(1)).thenReturn(dataSource);
+        Mockito.doReturn(dataSource).when(processService).findDataSourceById(1);
         Thread.sleep(10000);
     }
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
index bfc8205..01dee2b 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
@@ -14,17 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task.sqoop;
 
-import com.alibaba.fastjson.JSON;
-import org.apache.dolphinscheduler.common.enums.DbType;
 import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -35,8 +37,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
 
-import java.util.*;
-
 /**
  * sqoop task test
  */
@@ -45,71 +45,137 @@ public class SqoopTaskTest {
 
     private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class);
 
-    private ProcessService processService;
-    private ApplicationContext applicationContext;
     private SqoopTask sqoopTask;
 
     @Before
-    public void before() throws Exception{
-        processService = Mockito.mock(ProcessService.class);
-        Mockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
-        applicationContext = Mockito.mock(ApplicationContext.class);
+    public void before() {
+        ProcessService processService = Mockito.mock(ProcessService.class);
+        ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
         SpringApplicationContext springApplicationContext = new SpringApplicationContext();
         springApplicationContext.setApplicationContext(applicationContext);
         Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
 
-        TaskProps props = new TaskProps();
-        props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
-        props.setTenantCode("1");
-        props.setEnvFile(".dolphinscheduler_env.sh");
-        props.setTaskStartTime(new Date());
-        props.setTaskTimeout(0);
-        props.setTaskParams("{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHA [...]
-
-        sqoopTask = new SqoopTask(new TaskExecutionContext(),logger);
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskAppId(String.valueOf(System.currentTimeMillis()));
+        taskExecutionContext.setTenantCode("1");
+        taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh");
+        taskExecutionContext.setStartTime(new Date());
+        taskExecutionContext.setTaskTimeout(0);
+        taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+            + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\","
+            + "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\","
+            + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],"
+            + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\""
+            + ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,"
+            + "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\","
+            + "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
+
+        sqoopTask = new SqoopTask(taskExecutionContext, logger);
+        //test sqoop task init method
         sqoopTask.init();
     }
 
+    /**
+     * test SqoopJobGenerator
+     */
     @Test
-    public void testGenerator(){
-        String data1 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"deleteTargetDir\\\":tru [...]
-        SqoopParameters sqoopParameters1 = JSON.parseObject(data1,SqoopParameters.class);
-
+    public void testGenerator() {
+        TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext();
+
+        //sqoop TEMPLATE job
+        //import mysql to HDFS with hadoop
+        String mysqlToHdfs =
+            "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],"
+                + "\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+                + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\","
+                + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\","
+                + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\","
+                + "\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+                + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
+        SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs, SqoopParameters.class);
         SqoopJobGenerator generator = new SqoopJobGenerator();
-        String script = generator.generateSqoopJob(sqoopParameters1,new TaskExecutionContext());
-        String expected = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
-        Assert.assertEquals(expected, script);
-
-        String data2 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\" [...]
-        SqoopParameters sqoopParameters2 = JSON.parseObject(data2,SqoopParameters.class);
-
-        String script2 = generator.generateSqoopJob(sqoopParameters2,new TaskExecutionContext());
-        String expected2 = "sqoop export -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
-        Assert.assertEquals(expected2, script2);
-
-        String data3 = "{\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\ [...]
-        SqoopParameters sqoopParameters3 = JSON.parseObject(data3,SqoopParameters.class);
-
-        String script3 = generator.generateSqoopJob(sqoopParameters3,new TaskExecutionContext());
-        String expected3 = "sqoop export -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'";
-        Assert.assertEquals(expected3, script3);
-
-        String data4 = "{\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\" [...]
-        SqoopParameters sqoopParameters4 = JSON.parseObject(data4,SqoopParameters.class);
-
-        String script4 = generator.generateSqoopJob(sqoopParameters4,new TaskExecutionContext());
-        String expected4 = "sqoop import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import  --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
-        Assert.assertEquals(expected4, script4);
+        String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext);
+        String mysqlToHdfsExpected =
+            "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct  -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" "
+                + "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile "
+                + "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
+        Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript);
+
+        //export hdfs to mysql using update mode
+        String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\","
+            + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\","
+            + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\","
+            + "\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\","
+            + "\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
+        SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class);
+        String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext);
+        String hdfsToMysqlScriptExpected =
+            "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect \"jdbc:mysql://192.168.0.111:3306/test\" "
+                + "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' "
+                + "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
+        Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript);
+
+        //export hive to mysql
+        String hiveToMysql =
+            "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\","
+                + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\","
+                + "\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\","
+                + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\","
+                + "\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+                + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
+        SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql, SqoopParameters.class);
+        String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext);
+        String hiveToMysqlExpected =
+            "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date "
+                + "--hcatalog-partition-values 2020-02-17 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" --table person_3 "
+                + "--fields-terminated-by '@' --lines-terminated-by '\\n'";
+        Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript);
+
+        //import mysql to hive
+        String mysqlToHive =
+            "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
+                + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
+                + "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
+                + "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
+                + "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
+                + "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
+        SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
+        String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
+        String mysqlToHiveExpected =
+            "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test\" --username kylo --password \"123456\" "
+                + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 "
+                + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
+        Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);
+
+        //sqoop CUSTOM job
+        String sqoopCustomString = "{\"jobType\":\"CUSTOM\",\"localParams\":[],\"customShell\":\"sqoop import\"}";
+        SqoopParameters sqoopCustomParams = JSONUtils.parseObject(sqoopCustomString, SqoopParameters.class);
+        String sqoopCustomScript = generator.generateSqoopJob(sqoopCustomParams, new TaskExecutionContext());
+        String sqoopCustomExpected = "sqoop import";
+        Assert.assertEquals(sqoopCustomExpected, sqoopCustomScript);
 
     }
 
-    private DataSource getDataSource() {
-        DataSource dataSource = new DataSource();
-        dataSource.setType(DbType.MYSQL);
-        dataSource.setConnectionParams(
-                "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}");
-        dataSource.setUserId(1);
-        return dataSource;
+    /**
+     * get taskExecutionContext include mysql
+     *
+     * @return TaskExecutionContext
+     */
+    private TaskExecutionContext getMysqlTaskExecutionContext() {
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
+        String mysqlSourceConnectionParams =
+            "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
+        String mysqlTargetConnectionParams =
+            "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
+        sqoopTaskExecutionContext.setDataSourceId(2);
+        sqoopTaskExecutionContext.setDataTargetId(2);
+        sqoopTaskExecutionContext.setSourcetype(0);
+        sqoopTaskExecutionContext.setTargetConnectionParams(mysqlTargetConnectionParams);
+        sqoopTaskExecutionContext.setSourceConnectionParams(mysqlSourceConnectionParams);
+        sqoopTaskExecutionContext.setTargetType(0);
+        taskExecutionContext.setSqoopTaskExecutionContext(sqoopTaskExecutionContext);
+        return taskExecutionContext;
     }
 
     @Test
@@ -121,7 +187,7 @@ public class SqoopTaskTest {
      * Method: init
      */
     @Test
-    public void testInit(){
+    public void testInit() {
         try {
             sqoopTask.init();
         } catch (Exception e) {
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue
index a173139..05e248f 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/_source/datasource.vue
@@ -131,7 +131,7 @@
     },
     created () {
       let supportType = this.supportType || []
-      this.typeList = _.cloneDeep(this.store.state.dag.dsTypeListS)
+      this.typeList = this.data.typeList || _.cloneDeep(this.store.state.dag.dsTypeListS)
       // Have a specified data source
       if (supportType.length) {
         let is = (type) => {
diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue
index 2ff3dbf..ce1a770 100644
--- a/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue
+++ b/dolphinscheduler-ui/src/js/conf/home/pages/dag/_source/formModel/tasks/sqoop.vue
@@ -1,134 +1,194 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 <template>
   <div class="sql-model">
 
     <m-list-box>
-      <div slot="text">{{$t('Direct')}}</div>
+      <div slot="text">{{$t('Custom Job')}}</div>
       <div slot="content">
-        <x-select
-                style="width: 130px;"
-                v-model="modelType"
-                :disabled="isDetails">
-          <x-option
-                  v-for="city in modelTypeList"
-                  :key="city.code"
-                  :value="city.code"
-                  :label="city.code">
-          </x-option>
-        </x-select>
+        <x-switch
+          v-model="isCustomTask"
+          @on-click="_onSwitch"
+          :disabled="isDetails"
+        >
+        </x-switch>
       </div>
     </m-list-box>
-
-    <template>
+    <m-list-box v-show="isCustomTask">
+      <div slot="text">{{$t('Custom Script')}}</div>
+      <div slot="content">
+        <div class="from-mirror">
+          <textarea
+            id="code-shell-mirror"
+            name="code-shell-mirror"
+            style="opacity: 0;">
+          </textarea>
+        </div>
+      </div>
+    </m-list-box>
+    <template v-if="!isCustomTask">
       <m-list-box>
-        <div slot="text" style="font-weight:bold">{{$t('Data Source')}}</div>
+        <div slot="text">{{$t('Sqoop Job Name')}}</div>
+        <div slot="content">
+          <x-input
+            :disabled="isDetails"
+            type="text"
+            v-model="jobName"
+            :placeholder="$t('Please enter Job Name(required)')">
+          </x-input>
+        </div>
       </m-list-box>
-      <hr style="margin-left: 60px;">
       <m-list-box>
-        <div slot="text">{{$t('Type')}}</div>
+        <div slot="text">{{$t('Direct')}}</div>
         <div slot="content">
           <x-select
-                  style="width: 130px;"
-                  v-model="sourceType"
-                  :disabled="isDetails"
-                  @on-change="_handleSourceTypeChange">
+            style="width: 130px;"
+            v-model="modelType"
+            :disabled="isDetails"
+            @on-change="_handleModelTypeChange">
             <x-option
-                    v-for="city in sourceTypeList"
-                    :key="city.code"
-                    :value="city.code"
-                    :label="city.code">
+              v-for="city in modelTypeList"
+              :key="city.code"
+              :value="city.code"
+              :label="city.code">
             </x-option>
           </x-select>
         </div>
       </m-list-box>
+      <m-list-box>
+        <div slot="text" style="width: 110px;">{{$t('Hadoop Custom Params')}}</div>
+        <div slot="content">
+          <m-local-params
+            ref="refMapColumnHadoopParams"
+            @on-local-params="_onHadoopCustomParams"
+            :udp-list="hadoopCustomParams"
+            :hide="false">
+          </m-local-params>
+        </div>
+      </m-list-box>
+      <m-list-box>
+        <div slot="text" style="width: 100px;">{{$t('Sqoop Advanced Parameters')}}</div>
+        <div slot="content">
+          <m-local-params
+            ref="refMapColumnAdvancedParams"
+            @on-local-params="_onSqoopAdvancedParams"
+            :udp-list="sqoopAdvancedParams"
+            :hide="false">
+          </m-local-params>
+        </div>
+      </m-list-box>
 
-      <template v-if="sourceType ==='MYSQL'">
-
+      <template>
         <m-list-box>
-          <div slot="text">{{$t('Datasource')}}</div>
-          <div slot="content">
-            <m-datasource
-                    ref="refSourceDs"
-                    @on-dsData="_onSourceDsData"
-                    :data="{ type:sourceMysqlParams.srcType,datasource:sourceMysqlParams.srcDatasource }"
-                    >
-            </m-datasource>
-          </div>
+          <div slot="text" style="font-weight:bold">{{$t('Data Source')}}</div>
         </m-list-box>
-
+        <hr style="margin-left: 60px;">
         <m-list-box>
-          <div slot="text">{{$t('ModelType')}}</div>
+          <div slot="text">{{$t('Type')}}</div>
           <div slot="content">
-            <x-radio-group v-model="srcQueryType" @on-change="_handleQueryType">
-              <x-radio label="0">{{$t('Form')}}</x-radio>
-              <x-radio label="1">SQL</x-radio>
-            </x-radio-group>
+            <x-select
+              style="width: 130px;"
+              v-model="sourceType"
+              :disabled="isDetails"
+              @on-change="_handleSourceTypeChange">
+              <x-option
+                v-for="city in sourceTypeList"
+                :key="city.code"
+                :value="city.code"
+                :label="city.code">
+              </x-option>
+            </x-select>
           </div>
         </m-list-box>
 
-        <template v-if="sourceMysqlParams.srcQueryType=='0'">
+        <template v-if="sourceType ==='MYSQL'">
 
           <m-list-box>
-            <div slot="text">{{$t('Table')}}</div>
+            <div slot="text">{{$t('Datasource')}}</div>
             <div slot="content">
-              <x-input
-                      :disabled="isDetails"
-                      type="text"
-                      v-model="sourceMysqlParams.srcTable"
-                      :placeholder="$t('Please enter Mysql Table(required)')">
-              </x-input>
+              <m-datasource
+                ref="refSourceDs"
+                @on-dsData="_onSourceDsData"
+                :data="{type:sourceMysqlParams.srcType,
+                      typeList: [{id: 0, code: 'MYSQL', disabled: false}],
+                      datasource:sourceMysqlParams.srcDatasource }"
+              >
+              </m-datasource>
             </div>
           </m-list-box>
 
           <m-list-box>
-            <div slot="text">{{$t('ColumnType')}}</div>
+            <div slot="text">{{$t('ModelType')}}</div>
             <div slot="content">
-              <x-radio-group v-model="sourceMysqlParams.srcColumnType">
-                <x-radio label="0">{{$t('All Columns')}}</x-radio>
-                <x-radio label="1">{{$t('Some Columns')}}</x-radio>
+              <x-radio-group v-model="srcQueryType" @on-change="_handleQueryType">
+                <x-radio label="0">{{$t('Form')}}</x-radio>
+                <x-radio label="1">SQL</x-radio>
               </x-radio-group>
             </div>
           </m-list-box>
 
-          <m-list-box v-if="sourceMysqlParams.srcColumnType=='1'">
-            <div slot="text">{{$t('Column')}}</div>
-            <div slot="content">
-              <x-input
-                      :disabled="isDetails"
-                      type="text"
-                      v-model="sourceMysqlParams.srcColumns"
-                      :placeholder="$t('Please enter Columns (Comma separated)')">
-              </x-input>
-            </div>
-          </m-list-box>
+          <template v-if="sourceMysqlParams.srcQueryType=='0'">
+
+            <m-list-box>
+              <div slot="text">{{$t('Table')}}</div>
+              <div slot="content">
+                <x-input
+                  :disabled="isDetails"
+                  type="text"
+                  v-model="sourceMysqlParams.srcTable"
+                  :placeholder="$t('Please enter Mysql Table(required)')">
+                </x-input>
+              </div>
+            </m-list-box>
+
+            <m-list-box>
+              <div slot="text">{{$t('ColumnType')}}</div>
+              <div slot="content">
+                <x-radio-group v-model="sourceMysqlParams.srcColumnType">
+                  <x-radio label="0">{{$t('All Columns')}}</x-radio>
+                  <x-radio label="1">{{$t('Some Columns')}}</x-radio>
+                </x-radio-group>
+              </div>
+            </m-list-box>
+
+            <m-list-box v-if="sourceMysqlParams.srcColumnType=='1'">
+              <div slot="text">{{$t('Column')}}</div>
+              <div slot="content">
+                <x-input
+                  :disabled="isDetails"
+                  type="text"
+                  v-model="sourceMysqlParams.srcColumns"
+                  :placeholder="$t('Please enter Columns (Comma separated)')">
+                </x-input>
+              </div>
+            </m-list-box>
+          </template>
         </template>
       </template>
-    </template>
 
-    <template v-if="sourceType=='HIVE'">
-      <m-list-box>
+      <template v-if="sourceType=='HIVE'">
+        <m-list-box>
           <div slot="text">{{$t('Database')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="sourceHiveParams.hiveDatabase"
-                    :placeholder="$t('Please enter Hive Database(required)')">
+              :disabled="isDetails"
+              type="text"
+              v-model="sourceHiveParams.hiveDatabase"
+              :placeholder="$t('Please enter Hive Database(required)')">
             </x-input>
           </div>
         </m-list-box>
@@ -136,10 +196,10 @@
           <div slot="text">{{$t('Table')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="sourceHiveParams.hiveTable"
-                    :placeholder="$t('Please enter Hive Table(required)')">
+              :disabled="isDetails"
+              type="text"
+              v-model="sourceHiveParams.hiveTable"
+              :placeholder="$t('Please enter Hive Table(required)')">
             </x-input>
           </div>
         </m-list-box>
@@ -147,10 +207,10 @@
           <div slot="text">{{$t('Hive partition Keys')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="sourceHiveParams.hivePartitionKey"
-                    :placeholder="$t('Please enter Hive Partition Keys')">
+              :disabled="isDetails"
+              type="text"
+              v-model="sourceHiveParams.hivePartitionKey"
+              :placeholder="$t('Please enter Hive Partition Keys')">
             </x-input>
           </div>
         </m-list-box>
@@ -158,97 +218,97 @@
           <div slot="text">{{$t('Hive partition Values')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="sourceHiveParams.hivePartitionValue"
-                    :placeholder="$t('Please enter Hive Partition Values')">
+              :disabled="isDetails"
+              type="text"
+              v-model="sourceHiveParams.hivePartitionValue"
+              :placeholder="$t('Please enter Hive Partition Values')">
             </x-input>
           </div>
         </m-list-box>
-    </template>
+      </template>
 
-    <template v-if="sourceType=='HDFS'">
-      <m-list-box>
-        <div slot="text">{{$t('Export Dir')}}</div>
+      <template v-if="sourceType=='HDFS'">
+        <m-list-box>
+          <div slot="text">{{$t('Export Dir')}}</div>
+          <div slot="content">
+            <x-input
+              :disabled="isDetails"
+              type="text"
+              v-model="sourceHdfsParams.exportDir"
+              :placeholder="$t('Please enter Export Dir(required)')">
+            </x-input>
+          </div>
+        </m-list-box>
+      </template>
+
+      <m-list-box v-show="srcQueryType === '1' && sourceType ==='MYSQL'">
+        <div slot="text">{{$t('SQL Statement')}}</div>
         <div slot="content">
-          <x-input
-                  :disabled="isDetails"
-                  type="text"
-                  v-model="sourceHdfsParams.exportDir"
-                  :placeholder="$t('Please enter Export Dir(required)')">
-          </x-input>
+          <div class="from-mirror">
+            <textarea
+              id="code-sqoop-mirror"
+              name="code-sqoop-mirror"
+              style="opacity: 0;">
+            </textarea>
+          </div>
         </div>
       </m-list-box>
-    </template>
 
-    <m-list-box v-show="srcQueryType === '1' && sourceType ==='MYSQL'">
-      <div slot="text">{{$t('SQL Statement')}}</div>
-      <div slot="content">
-        <div class="from-mirror">
-          <textarea
-                  id="code-sqoop-mirror"
-                  name="code-sqoop-mirror"
-                  style="opacity: 0;">
-          </textarea>
-        </div>
-      </div>
-    </m-list-box>
+      <template>
+        <m-list-box v-show="sourceType === 'MYSQL'">
+          <div slot="text">{{$t('Map Column Hive')}}</div>
+          <div slot="content">
+            <m-local-params
+              ref="refMapColumnHiveParams"
+              @on-local-params="_onMapColumnHive"
+              :udp-list="sourceMysqlParams.mapColumnHive"
+              :hide="false">
+            </m-local-params>
+          </div>
+        </m-list-box>
+        <m-list-box v-show="sourceType === 'MYSQL'">
+          <div slot="text">{{$t('Map Column Java')}}</div>
+          <div slot="content">
+            <m-local-params
+              ref="refMapColumnJavaParams"
+              @on-local-params="_onMapColumnJava"
+              :udp-list="sourceMysqlParams.mapColumnJava"
+              :hide="false">
+            </m-local-params>
+          </div>
+        </m-list-box>
+      </template>
 
-    <template>
-      <m-list-box v-show="sourceType === 'MYSQL'">
-        <div slot="text">{{$t('Map Column Hive')}}</div>
-        <div slot="content">
-          <m-local-params
-                  ref="refMapColumnHiveParams"
-                  @on-local-params="_onMapColumnHive"
-                  :udp-list="sourceMysqlParams.mapColumnHive"
-                  :hide="false">
-          </m-local-params>
-        </div>
+      <m-list-box>
+        <div slot="text" style="font-weight:bold">{{$t('Data Target')}}</div>
       </m-list-box>
-      <m-list-box v-show="sourceType === 'MYSQL'">
-        <div slot="text">{{$t('Map Column Java')}}</div>
+      <hr style="margin-left: 60px;">
+
+      <m-list-box>
+        <div slot="text">{{$t('Type')}}</div>
         <div slot="content">
-          <m-local-params
-                  ref="refMapColumnJavaParams"
-                  @on-local-params="_onMapColumnJava"
-                  :udp-list="sourceMysqlParams.mapColumnJava"
-                  :hide="false">
-          </m-local-params>
+          <x-select
+            style="width: 130px;"
+            v-model="targetType"
+            :disabled="isDetails">
+            <x-option
+              v-for="city in targetTypeList"
+              :key="city.code"
+              :value="city.code"
+              :label="city.code">
+            </x-option>
+          </x-select>
         </div>
       </m-list-box>
-    </template>
-
-    <m-list-box>
-      <div slot="text" style="font-weight:bold">{{$t('Data Target')}}</div>
-    </m-list-box>
-    <hr style="margin-left: 60px;">
-
-    <m-list-box>
-      <div slot="text">{{$t('Type')}}</div>
-      <div slot="content">
-        <x-select
-                style="width: 130px;"
-                v-model="targetType"
-                :disabled="isDetails">
-          <x-option
-                  v-for="city in targetTypeList"
-                  :key="city.code"
-                  :value="city.code"
-                  :label="city.code">
-          </x-option>
-        </x-select>
-      </div>
-    </m-list-box>
-    <div v-show="targetType==='HIVE'">
-      <m-list-box>
+      <div v-show="targetType==='HIVE'">
+        <m-list-box>
           <div slot="text">{{$t('Database')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHiveParams.hiveDatabase"
-                    :placeholder="$t('Please enter Hive Database(required)')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHiveParams.hiveDatabase"
+              :placeholder="$t('Please enter Hive Database(required)')">
             </x-input>
           </div>
         </m-list-box>
@@ -256,10 +316,10 @@
           <div slot="text">{{$t('Table')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHiveParams.hiveTable"
-                    :placeholder="$t('Please enter Hive Table(required)')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHiveParams.hiveTable"
+              :placeholder="$t('Please enter Hive Table(required)')">
             </x-input>
           </div>
         </m-list-box>
@@ -285,10 +345,10 @@
           <div slot="text">{{$t('ReplaceDelimiter')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHiveParams.replaceDelimiter"
-                    :placeholder="$t('Please enter Replace Delimiter')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHiveParams.replaceDelimiter"
+              :placeholder="$t('Please enter Replace Delimiter')">
             </x-input>
           </div>
         </m-list-box>
@@ -296,10 +356,10 @@
           <div slot="text">{{$t('Hive partition Keys')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHiveParams.hivePartitionKey"
-                    :placeholder="$t('Please enter Hive Partition Keys')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHiveParams.hivePartitionKey"
+              :placeholder="$t('Please enter Hive Partition Keys')">
             </x-input>
           </div>
         </m-list-box>
@@ -307,62 +367,62 @@
           <div slot="text">{{$t('Hive partition Values')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHiveParams.hivePartitionValue"
-                    :placeholder="$t('Please enter Hive Partition Values')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHiveParams.hivePartitionValue"
+              :placeholder="$t('Please enter Hive Partition Values')">
             </x-input>
           </div>
         </m-list-box>
-    </div>
-    <div v-show="targetType==='HDFS'">
-      <m-list-box>
-        <div slot="text">{{$t('Target Dir')}}</div>
-        <div slot="content">
-          <x-input
-                  :disabled="isDetails"
-                  type="text"
-                  v-model="targetHdfsParams.targetPath"
-                  :placeholder="$t('Please enter Target Dir(required)')">
-          </x-input>
-        </div>
-      </m-list-box>
-      <m-list-box>
+      </div>
+      <div v-show="targetType==='HDFS'">
+        <m-list-box>
+          <div slot="text">{{$t('Target Dir')}}</div>
+          <div slot="content">
+            <x-input
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHdfsParams.targetPath"
+              :placeholder="$t('Please enter Target Dir(required)')">
+            </x-input>
+          </div>
+        </m-list-box>
+        <m-list-box>
           <div slot="text">{{$t('DeleteTargetDir')}}</div>
           <div slot="content">
             <x-switch v-model="targetHdfsParams.deleteTargetDir"></x-switch>
           </div>
         </m-list-box>
-      <m-list-box>
-        <div slot="text">{{$t('CompressionCodec')}}</div>
-        <div slot="content">
-          <x-radio-group v-model="targetHdfsParams.compressionCodec">
-            <x-radio label="snappy">snappy</x-radio>
-            <x-radio label="lzo">lzo</x-radio>
-            <x-radio label="gzip">gzip</x-radio>
-            <x-radio label="">no</x-radio>
-          </x-radio-group>
-        </div>
-      </m-list-box>
-      <m-list-box>
-        <div slot="text">{{$t('FileType')}}</div>
-        <div slot="content">
-          <x-radio-group v-model="targetHdfsParams.fileType">
-            <x-radio label="--as-avrodatafile">avro</x-radio>
-            <x-radio label="--as-sequencefile">sequence</x-radio>
-            <x-radio label="--as-textfile">text</x-radio>
-            <x-radio label="--as-parquetfile">parquet</x-radio>
-          </x-radio-group>
-        </div>
-      </m-list-box>
-      <m-list-box>
+        <m-list-box>
+          <div slot="text">{{$t('CompressionCodec')}}</div>
+          <div slot="content">
+            <x-radio-group v-model="targetHdfsParams.compressionCodec">
+              <x-radio label="snappy">snappy</x-radio>
+              <x-radio label="lzo">lzo</x-radio>
+              <x-radio label="gzip">gzip</x-radio>
+              <x-radio label="">no</x-radio>
+            </x-radio-group>
+          </div>
+        </m-list-box>
+        <m-list-box>
+          <div slot="text">{{$t('FileType')}}</div>
+          <div slot="content">
+            <x-radio-group v-model="targetHdfsParams.fileType">
+              <x-radio label="--as-avrodatafile">avro</x-radio>
+              <x-radio label="--as-sequencefile">sequence</x-radio>
+              <x-radio label="--as-textfile">text</x-radio>
+              <x-radio label="--as-parquetfile">parquet</x-radio>
+            </x-radio-group>
+          </div>
+        </m-list-box>
+        <m-list-box>
           <div slot="text">{{$t('FieldsTerminated')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHdfsParams.fieldsTerminated"
-                    :placeholder="$t('Please enter Fields Terminated')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHdfsParams.fieldsTerminated"
+              :placeholder="$t('Please enter Fields Terminated')">
             </x-input>
           </div>
         </m-list-box>
@@ -370,45 +430,47 @@
           <div slot="text">{{$t('LinesTerminated')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetHdfsParams.linesTerminated"
-                    :placeholder="$t('Please enter Lines Terminated')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetHdfsParams.linesTerminated"
+              :placeholder="$t('Please enter Lines Terminated')">
             </x-input>
           </div>
         </m-list-box>
-    </div>
-    <div v-show="targetType==='MYSQL'">
-      <m-list-box>
-        <div slot="text">{{$t('Datasource')}}</div>
-        <div slot="content">
-          <m-datasource
+      </div>
+      <div v-show="targetType==='MYSQL'">
+        <m-list-box>
+          <div slot="text">{{$t('Datasource')}}</div>
+          <div slot="content">
+            <m-datasource
               ref="refTargetDs"
               @on-dsData="_onTargetDsData"
-              :data="{ type:targetMysqlParams.targetType,datasource:targetMysqlParams.targetDatasource }"
-              >
-          </m-datasource>
-        </div>
-      </m-list-box>
+              :data="{ type:targetMysqlParams.targetType,
+                      typeList: [{id: 0, code: 'MYSQL', disabled: false}],
+                      datasource:targetMysqlParams.targetDatasource }"
+            >
+            </m-datasource>
+          </div>
+        </m-list-box>
         <m-list-box>
           <div slot="text">{{$t('Table')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetMysqlParams.targetTable"
-                    :placeholder="$t('Please enter Mysql Table(required)')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetMysqlParams.targetTable"
+              :placeholder="$t('Please enter Mysql Table(required)')">
             </x-input>
           </div>
         </m-list-box>
-          <m-list-box>
+        <m-list-box>
           <div slot="text">{{$t('Column')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetMysqlParams.targetColumns"
-                    :placeholder="$t('Please enter Columns (Comma separated)')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetMysqlParams.targetColumns"
+              :placeholder="$t('Please enter Columns (Comma separated)')">
             </x-input>
           </div>
         </m-list-box>
@@ -416,10 +478,10 @@
           <div slot="text">{{$t('FieldsTerminated')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetMysqlParams.fieldsTerminated"
-                    :placeholder="$t('Please enter Fields Terminated')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetMysqlParams.fieldsTerminated"
+              :placeholder="$t('Please enter Fields Terminated')">
             </x-input>
           </div>
         </m-list-box>
@@ -427,10 +489,10 @@
           <div slot="text">{{$t('LinesTerminated')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetMysqlParams.linesTerminated"
-                    :placeholder="$t('Please enter Lines Terminated')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetMysqlParams.linesTerminated"
+              :placeholder="$t('Please enter Lines Terminated')">
             </x-input>
           </div>
         </m-list-box>
@@ -444,37 +506,37 @@
           <div slot="text">{{$t('UpdateKey')}}</div>
           <div slot="content">
             <x-input
-                    :disabled="isDetails"
-                    type="text"
-                    v-model="targetMysqlParams.targetUpdateKey"
-                    :placeholder="$t('Please enter Update Key')">
+              :disabled="isDetails"
+              type="text"
+              v-model="targetMysqlParams.targetUpdateKey"
+              :placeholder="$t('Please enter Update Key')">
             </x-input>
           </div>
         </m-list-box>
         <m-list-box v-show="targetMysqlParams.isUpdate">
           <div slot="text">{{$t('UpdateMode')}}</div>
           <div slot="content">
-              <x-radio-group v-model="targetMysqlParams.targetUpdateMode">
-                <x-radio label="updateonly">{{$t('OnlyUpdate')}}</x-radio>
-                <x-radio label="allowinsert">{{$t('AllowInsert')}}</x-radio>
-              </x-radio-group>
-            </div>
+            <x-radio-group v-model="targetMysqlParams.targetUpdateMode">
+              <x-radio label="updateonly">{{$t('OnlyUpdate')}}</x-radio>
+              <x-radio label="allowinsert">{{$t('AllowInsert')}}</x-radio>
+            </x-radio-group>
+          </div>
         </m-list-box>
 
-    </div>
-    <m-list-box>
-      <div slot="text">{{$t('Concurrency')}}</div>
-      <div slot="content">
-        <x-input
-                :disabled="isDetails"
-                type="text"
-                v-model="concurrency"
-                :placeholder="$t('Please enter Concurrency')">
-
-        </x-input>
       </div>
-    </m-list-box>
+      <m-list-box>
+        <div slot="text">{{$t('Concurrency')}}</div>
+        <div slot="content">
+          <x-input
+            :disabled="isDetails"
+            type="text"
+            v-model="concurrency"
+            :placeholder="$t('Please enter Concurrency')">
 
+          </x-input>
+        </div>
+      </m-list-box>
+    </template>
     <m-list-box>
       <div slot="text">{{$t('Custom Parameters')}}</div>
       <div slot="content">
@@ -498,16 +560,38 @@
   import codemirror from '@/conf/home/pages/resource/pages/file/pages/_source/codemirror'
 
   let editor
+  let shellEditor
 
   export default {
     name: 'sql',
     data () {
       return {
-
+        /**
+         * Is Custom Task
+         */
+        isCustomTask: false,
         /**
          * Customer Params
          */
         localParams: [],
+
+        /**
+         * Hadoop Custom Params
+         */
+        hadoopCustomParams: [],
+
+        /**
+         * Sqoop Advanced Params
+         */
+        sqoopAdvancedParams: [],
+        /**
+         * script
+         */
+        customShell: '',
+        /**
+         * task name
+         */
+        jobName: '',
         /**
          * mysql query type
          */
@@ -525,23 +609,21 @@
          */
         concurrency:1,
         /**
+         * default job type
+         */
+        jobType:'TEMPLATE',
+        /**
          * direct model type
          */
         modelType:'import',
 
         modelTypeList: [{ code: 'import' }, { code: 'export' }],
 
-        sourceTypeList:[
+        sourceTypeList: [
           {
-            code:"MYSQL"
+            code: "MYSQL"
           },
-          {
-            code:"HDFS"
-          },
-          {
-            code:"HIVE"
-          }
-          ],
+        ],
 
         targetTypeList:[
           {
@@ -620,19 +702,63 @@
     },
     methods: {
 
+      _onSwitch(is){
+        if(is) {
+          this.jobType = 'CUSTOM'
+          this.isCustomTask = true
+          setTimeout(() => {
+            this._handlerShellEditor()
+          }, 200)
+        } else {
+          this.jobType = 'TEMPLATE'
+          this.isCustomTask = false
+          setTimeout(() => {
+            this._handlerEditor()
+          }, 200)
+        }
+      },
+
       _handleQueryType(o){
         this.sourceMysqlParams.srcQueryType = this.srcQueryType
+        this._getTargetTypeList(this.sourceType)
+        this.targetType = this.targetTypeList[0].code
+      },
+
+      _handleModelTypeChange(a){
+        this._getSourceTypeList(a.label)
+        this.sourceType = this.sourceTypeList[0].code
+        this._handleSourceTypeChange({label: this.sourceType, value: this.sourceType})
       },
 
       _handleSourceTypeChange(a){
-         this._getTargetTypeList(a.label)
-         this.targetType = this.targetTypeList[0].code
+        this._getTargetTypeList(a.label)
+        this.targetType = this.targetTypeList[0].code
       },
 
-      _getTargetTypeList(data){
+      _getSourceTypeList(data){
         switch(data){
-          case 'MYSQL':
-            this.targetTypeList = [
+          case 'import':
+            this.sourceTypeList = [
+              {
+                code:"MYSQL"
+              },
+            ]
+            break;
+          case 'export':
+            this.sourceTypeList = [
+              {
+                code: "HDFS"
+              },
+              {
+                code: "HIVE"
+              }
+            ]
+            break;
+          default:
+            this.sourceTypeList = [
+              {
+                code:"MYSQL"
+              },
               {
                 code:"HIVE"
               },
@@ -641,6 +767,28 @@
               }
             ]
             break;
+        }
+      },
+
+      _getTargetTypeList(data){
+        switch(data){
+          case 'MYSQL':
+            if (this.srcQueryType === "1") {
+              this.targetTypeList = [
+                {
+                  code: "HDFS"
+                }]
+            } else {
+              this.targetTypeList = [
+                {
+                  code: "HIVE"
+                },
+                {
+                  code: "HDFS"
+                }
+              ]
+            }
+            break;
           case 'HDFS':
             this.targetTypeList = [
               {
@@ -670,12 +818,10 @@
 
       _onMapColumnHive (a) {
         this.sourceMysqlParams.mapColumnHive = a
-        console.log(this.sourceMysqlParams.mapColumnHive)
       },
 
       _onMapColumnJava (a) {
         this.sourceMysqlParams.mapColumnJava = a
-        console.log(this.sourceMysqlParams.mapColumnJava)
       },
 
       /**
@@ -695,13 +841,14 @@
       },
 
       /**
-      * stringify the source params
-      */
+       * stringify the source params
+       */
       _handleSourceParams() {
         var params = null
         switch(this.sourceType){
           case "MYSQL":
-            this.sourceMysqlParams.srcQuerySql = editor ? editor.getValue() : this.sourceMysqlParams.srcQuerySql
+            this.sourceMysqlParams.srcQuerySql = this.sourceMysqlParams.srcQueryType === "1" && editor ?
+              editor.getValue() : this.sourceMysqlParams.srcQuerySql
             params = JSON.stringify(this.sourceMysqlParams)
             break;
           case "ORACLE":
@@ -715,14 +862,14 @@
             break;
           default:
             params = "";
-              break;
+            break;
         }
         return params
       },
 
-     /**
-      * stringify the target params
-      */
+      /**
+       * stringify the target params
+       */
       _handleTargetParams() {
         var params = null
         switch(this.targetType){
@@ -791,91 +938,114 @@
        * verification
        */
       _verification () {
+        let sqoopParams = {
+          jobType: this.jobType,
+          localParams: this.localParams
+        }
+        if (this.jobType === 'CUSTOM') {
+          if (!shellEditor.getValue()) {
+            this.$message.warning(`${i18n.$t('Please enter Custom Shell(required)')}`)
+            return false
+          }
+          sqoopParams['customShell'] = shellEditor.getValue()
+        } else {
+          if (!this.jobName) {
+            this.$message.warning(`${i18n.$t('Please enter Job Name(required)')}`)
+            return false
+          }
 
-        switch(this.sourceType){
-          case "MYSQL":
-            if (!this.$refs.refSourceDs._verifDatasource()) {
-              return false
-            }
-            if(this.srcQueryType === '1'){
-              if (!editor.getValue()) {
-                this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`)
-                return false
-              }
-            }else{
-              if (this.sourceMysqlParams.srcTable === "") {
-                this.$message.warning(`${i18n.$t('Please enter Mysql Table(required)')}`)
+          switch (this.sourceType) {
+            case "MYSQL":
+              if (!this.$refs.refSourceDs._verifDatasource()) {
                 return false
               }
-
-              if(this.sourceMysqlParams.srcColumnType === "1" && this.sourceMysqlParams.srcColumns === ""){
-                this.$message.warning(`${i18n.$t('Please enter Columns (Comma separated)')}`)
-                return false
+              if (this.srcQueryType === '1') {
+                if (!editor.getValue()) {
+                  this.$message.warning(`${i18n.$t('Please enter a SQL Statement(required)')}`)
+                  return false
+                }
+                this.sourceMysqlParams.srcTable = ""
+                this.sourceMysqlParams.srcColumnType = "0"
+                this.sourceMysqlParams.srcColumns = ""
+              } else {
+                if (this.sourceMysqlParams.srcTable === "") {
+                  this.$message.warning(`${i18n.$t('Please enter Mysql Table(required)')}`)
+                  return false
+                }
+                this.sourceMysqlParams.srcQuerySql = ""
+                if (this.sourceMysqlParams.srcColumnType === "1" && this.sourceMysqlParams.srcColumns === "") {
+                  this.$message.warning(`${i18n.$t('Please enter Columns (Comma separated)')}`)
+                  return false
+                }
+                if (this.sourceMysqlParams.srcColumnType === "0") {
+                  this.sourceMysqlParams.srcColumns = ""
+                }
               }
-            }
 
-            break;
-          case "HDFS":
-              if(this.sourceHdfsParams.exportDir === ""){
+              break;
+            case "HDFS":
+              if (this.sourceHdfsParams.exportDir === "") {
                 this.$message.warning(`${i18n.$t('Please enter Export Dir(required)')}`)
                 return false
               }
-            break;
-          case "HIVE":
-              if(this.sourceHiveParams.hiveDatabase === ""){
+              break;
+            case "HIVE":
+              if (this.sourceHiveParams.hiveDatabase === "") {
                 this.$message.warning(`${i18n.$t('Please enter Hive Database(required)')}`)
                 return false
               }
-              if(this.sourceHiveParams.hiveTable === ""){
+              if (this.sourceHiveParams.hiveTable === "") {
                 this.$message.warning(`${i18n.$t('Please enter Hive Table(required)')}`)
                 return false
               }
-            break;
-          default:
-            break;
-        }
+              break;
+            default:
+              break;
+          }
 
-        switch(this.targetType){
-          case "HIVE":
-            if(this.targetHiveParams.hiveDatabase === ""){
+          switch (this.targetType) {
+            case "HIVE":
+              if (this.targetHiveParams.hiveDatabase === "") {
                 this.$message.warning(`${i18n.$t('Please enter Hive Database(required)')}`)
                 return false
               }
-              if(this.targetHiveParams.hiveTable === ""){
+              if (this.targetHiveParams.hiveTable === "") {
                 this.$message.warning(`${i18n.$t('Please enter Hive Table(required)')}`)
                 return false
               }
-            break;
-          case "HDFS":
-             if(this.targetHdfsParams.targetPath === ""){
+              break;
+            case "HDFS":
+              if (this.targetHdfsParams.targetPath === "") {
                 this.$message.warning(`${i18n.$t('Please enter Target Dir(required)')}`)
                 return false
               }
-            break;
-          case "MYSQL":
+              break;
+            case "MYSQL":
               if (!this.$refs.refTargetDs._verifDatasource()) {
                 return false
               }
 
-              if(this.targetMysqlParams.targetTable === ""){
+              if (this.targetMysqlParams.targetTable === "") {
                 this.$message.warning(`${i18n.$t('Please enter Mysql Table(required)')}`)
                 return false
               }
-            break;
-          default:
-            break;
+              break;
+            default:
+              break;
+          }
+          sqoopParams['jobName'] = this.jobName
+          sqoopParams['hadoopCustomParams'] = this.hadoopCustomParams
+          sqoopParams['sqoopAdvancedParams'] = this.sqoopAdvancedParams
+          sqoopParams['concurrency'] = this.concurrency
+          sqoopParams['modelType'] = this.modelType
+          sqoopParams['sourceType'] = this.sourceType
+          sqoopParams['targetType'] = this.targetType
+          sqoopParams['targetParams'] = this._handleTargetParams()
+          sqoopParams['sourceParams'] = this._handleSourceParams()
         }
 
         // storage
-        this.$emit('on-params', {
-          concurrency:this.concurrency,
-          modelType:this.modelType,
-          sourceType:this.sourceType,
-          targetType:this.targetType,
-          sourceParams:this._handleSourceParams(),
-          targetParams:this._handleTargetParams(),
-          localParams:this.localParams
-        })
+        this.$emit('on-params', sqoopParams)
         return true
       },
 
@@ -912,13 +1082,54 @@
         return editor
       },
 
-     /**
+      /**
+       * Processing code highlighting
+       */
+      _handlerShellEditor () {
+        this._destroyShellEditor()
+
+        // shellEditor
+        shellEditor = codemirror('code-shell-mirror', {
+          mode: 'shell',
+          readOnly: this.isDetails
+        })
+
+        this.keypress = () => {
+          if (!shellEditor.getOption('readOnly')) {
+            shellEditor.showHint({
+              completeSingle: false
+            })
+          }
+        }
+
+        // Monitor keyboard
+        shellEditor.on('keypress', this.keypress)
+        shellEditor.setValue(this.customShell)
+
+        return shellEditor
+      },
+
+      /**
        * return localParams
        */
       _onLocalParams (a) {
         this.localParams = a
       },
 
+      /**
+       * return hadoopParams
+       */
+      _onHadoopCustomParams (a) {
+        this.hadoopCustomParams = a
+      },
+
+      /**
+       * return sqoopAdvancedParams
+       */
+      _onSqoopAdvancedParams (a) {
+        this.sqoopAdvancedParams = a
+      },
+
       _cacheParams () {
         this.$emit('on-cache-params', {
           concurrency:this.concurrency,
@@ -932,13 +1143,20 @@
       },
 
       _destroyEditor () {
-         if (editor) {
+        if (editor) {
           editor.toTextArea() // Uninstall
           editor.off($('.code-sqoop-mirror'), 'keypress', this.keypress)
           editor.off($('.code-sqoop-mirror'), 'changes', this.changes)
           editor = null
         }
       },
+      _destroyShellEditor () {
+        if (shellEditor) {
+          shellEditor.toTextArea() // Uninstall
+          shellEditor.off($('.code-shell-mirror'), 'keypress', this.keypress)
+          shellEditor.off($('.code-shell-mirror'), 'changes', this.changes)
+        }
+      }
     },
     watch: {
       // Listening to sqlType
@@ -970,14 +1188,24 @@
 
       // Non-null objects represent backfill
       if (!_.isEmpty(o)) {
-        this.concurrency = o.params.concurrency || 1,
-        this.modelType = o.params.modelType,
-        this.sourceType = o.params.sourceType,
-        this._getTargetTypeList(this.sourceType)
-        this.targetType = o.params.targetType,
-        this._getSourceParams(o.params.sourceParams),
-        this._getTargetParams(o.params.targetParams),
-        this.localParams = o.params.localParams
+        this.jobType = o.params.jobType
+        this.isCustomTask = false
+        if (this.jobType === 'CUSTOM') {
+          this.customShell = o.params.customShell
+          this.isCustomTask = true
+        } else {
+          this.jobName = o.params.jobName
+          this.hadoopCustomParams = o.params.hadoopCustomParams
+          this.sqoopAdvancedParams = o.params.sqoopAdvancedParams
+          this.concurrency = o.params.concurrency || 1
+          this.modelType = o.params.modelType
+          this.sourceType = o.params.sourceType
+          this._getTargetTypeList(this.sourceType)
+          this.targetType = o.params.targetType
+          this._getSourceParams(o.params.sourceParams)
+          this._getTargetParams(o.params.targetParams)
+          this.localParams = o.params.localParams
+        }
       }
     },
 
@@ -990,6 +1218,10 @@
       })
 
       setTimeout(() => {
+        this._handlerShellEditor()
+      }, 200)
+
+      setTimeout(() => {
         this.srcQueryType = this.sourceMysqlParams.srcQueryType
       }, 500)
     },
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
index 79eab5e..cd82922 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/en_US.js
@@ -547,6 +547,9 @@ export default {
   'Whether directory': 'Whether directory',
   Yes: 'Yes',
   No: 'No',
+  'Hadoop Custom Params': 'Hadoop Params',
+  'Sqoop Advanced Parameters': 'Sqoop Params',
+  'Sqoop Job Name': 'Job Name',
   'Please enter Mysql Database(required)': 'Please enter Mysql Database(required)',
   'Please enter Mysql Table(required)': 'Please enter Mysql Table(required)',
   'Please enter Columns (Comma separated)': 'Please enter Columns (Comma separated)',
@@ -561,6 +564,8 @@ export default {
   'Please enter Lines Terminated': 'Please enter Lines Terminated',
   'Please enter Concurrency': 'Please enter Concurrency',
   'Please enter Update Key': 'Please enter Update Key',
+  'Please enter Job Name(required)': 'Please enter Job Name(required)',
+  'Please enter Custom Shell(required)': 'Please enter Custom Shell(required)',
   Direct: 'Direct',
   Type: 'Type',
   ModelType: 'ModelType',
@@ -594,6 +599,8 @@ export default {
   'All Columns': 'All Columns',
   'Some Columns': 'Some Columns',
   'Branch flow': 'Branch flow',
+  'Custom Job': 'Custom Job',
+  'Custom Script': 'Custom Script',
   'Cannot select the same node for successful branch flow and failed branch flow': 'Cannot select the same node for successful branch flow and failed branch flow',
   'Successful branch flow and failed branch flow are required': 'conditions node Successful and failed branch flow are required',
   'No resources exist': 'No resources exist',
diff --git a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
index eebc686..e0419df 100755
--- a/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
+++ b/dolphinscheduler-ui/src/js/module/i18n/locale/zh_CN.js
@@ -545,6 +545,9 @@ export default {
   'Whether directory': '是否文件夹',
   Yes: '是',
   No: '否',
+  'Hadoop Custom Params': 'Hadoop参数',
+  'Sqoop Advanced Parameters': 'Sqoop参数',
+  'Sqoop Job Name': '任务名称',
   'Please enter Mysql Database(required)': '请输入Mysql数据库(必填)',
   'Please enter Mysql Table(required)': '请输入Mysql表名(必填)',
   'Please enter Columns (Comma separated)': '请输入列名,用 , 隔开',
@@ -559,6 +562,8 @@ export default {
   'Please enter Lines Terminated': '请输入行分隔符',
   'Please enter Concurrency': '请输入并发度',
   'Please enter Update Key': '请输入更新列',
+  'Please enter Job Name(required)': '请输入任务名称(必填)',
+  'Please enter Custom Shell(required)': '请输入自定义脚本',
   Direct: '流向',
   Type: '类型',
   ModelType: '模式',
@@ -592,6 +597,8 @@ export default {
   'All Columns': '全表导入',
   'Some Columns': '选择列',
   'Branch flow': '分支流转',
+  'Custom Job': '自定义任务',
+  'Custom Script': '自定义脚本',
   'Cannot select the same node for successful branch flow and failed branch flow': '成功分支流转和失败分支流转不能选择同一个节点',
   'Successful branch flow and failed branch flow are required': 'conditions节点成功和失败分支流转必填',
   'No resources exist': '不存在资源',
diff --git a/pom.xml b/pom.xml
index 57a3c95..d7d0847 100644
--- a/pom.xml
+++ b/pom.xml
@@ -819,6 +819,7 @@
                         <include>**/server/worker/task/spark/SparkTaskTest.java</include>
                         <include>**/server/worker/task/EnvFileTest.java</include>
                         <include>**/server/worker/task/spark/SparkTaskTest.java</include>
+                        <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
                         <include>**/server/worker/EnvFileTest.java</include>
                         <include>**/service/process/ProcessServiceTest.java</include>
                         <include>**/service/quartz/cron/CronUtilsTest.java</include>