You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/03/05 15:34:45 UTC

[incubator-dolphinscheduler] branch dev updated: Support worker server to run bat script (#2023)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6070738  Support worker server to run bat script (#2023)
6070738 is described below

commit 6070738acbb89e0a7ed49b487fc1225888e29019
Author: dailidong <da...@gmail.com>
AuthorDate: Thu Mar 5 23:34:37 2020 +0800

    Support worker server to run bat script (#2023)
    
    * Support worker server to run bat script
    
    1. Reimplement ProcessImpl.java, ProcessEnvironment.java and ProcessBuilder.java for Windows
    2. Modify shell task code for windows
    3. Add ASF License
    
    * Add Unit Test
---
 .../apache/dolphinscheduler/common/Constants.java  |    2 +-
 .../utils/process/ProcessBuilderForWin32.java      | 1065 ++++++++++++++++++++
 .../utils/process/ProcessEnvironmentForWin32.java  |  286 ++++++
 .../common/utils/process/ProcessImplForWin32.java  |  752 ++++++++++++++
 .../dolphinscheduler/common/ConstantsTest.java     |   25 +-
 .../dolphinscheduler/common/utils/OSUtilsTest.java |   24 +-
 .../utils/process/ProcessBuilderForWin32Test.java  |  210 ++++
 .../process/ProcessEnvironmentForWin32Test.java    |  124 +++
 .../utils/process/ProcessImplForWin32Test.java     |   70 ++
 .../worker/task/AbstractCommandExecutor.java       |   92 +-
 .../server/worker/task/ShellCommandExecutor.java   |   39 +-
 .../server/worker/task/datax/DataxTask.java        |   11 +-
 .../server/worker/task/shell/ShellTask.java        |    9 +-
 .../server/utils/ProcessUtilsTest.java             |   17 +
 .../server/worker/task/shell/ShellTaskTest.java    |  198 ++++
 pom.xml                                            |    4 +
 16 files changed, 2863 insertions(+), 65 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 73125f4..6af0e64 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -746,7 +746,7 @@ public final class Constants {
      * application regex
      */
     public static final String APPLICATION_REGEX = "application_\\d+_\\d+";
-    public static final String PID = "pid";
+    public static final String PID = OSUtils.isWindows() ? "handle" : "pid";
     /**
      * month_begin
      */
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java
new file mode 100644
index 0000000..4fb5f94
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32.java
@@ -0,0 +1,1065 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils.process;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is used to create operating system processes.
+ *
+ * <p>Each {@code ProcessBuilderForWindows} instance manages a collection
+ * of process attributes.  The {@link #start()} method creates a new
+ * {@link Process} instance with those attributes.  The {@link
+ * #start()} method can be invoked repeatedly from the same instance
+ * to create new subprocesses with identical or related attributes.
+ *
+ * <p>Each process builder manages these process attributes:
+ *
+ * <ul>
+ *
+ * <li>a <i>command</i>, a list of strings which signifies the
+ * external program file to be invoked and its arguments, if any.
+ * Which string lists represent a valid operating system command is
+ * system-dependent.  For example, it is common for each conceptual
+ * argument to be an element in this list, but there are operating
+ * systems where programs are expected to tokenize command line
+ * strings themselves - on such a system a Java implementation might
+ * require commands to contain exactly two elements.
+ *
+ * <li>an <i>environment</i>, which is a system-dependent mapping from
+ * <i>variables</i> to <i>values</i>.  The initial value is a copy of
+ * the environment of the current process (see {@link System#getenv()}).
+ *
+ * <li>a <i>working directory</i>.  The default value is the current
+ * working directory of the current process, usually the directory
+ * named by the system property {@code user.dir}.
+ *
+ * <li><a name="redirect-input">a source of <i>standard input</i></a>.
+ * By default, the subprocess reads input from a pipe.  Java code
+ * can access this pipe via the output stream returned by
+ * {@link Process#getOutputStream()}.  However, standard input may
+ * be redirected to another source using
+ * {@link #redirectInput(ProcessBuilderForWin32.Redirect) redirectInput}.
+ * In this case, {@link Process#getOutputStream()} will return a
+ * <i>null output stream</i>, for which:
+ *
+ * <ul>
+ * <li>the {@link OutputStream#write(int) write} methods always
+ * throw {@code IOException}
+ * <li>the {@link OutputStream#close() close} method does nothing
+ * </ul>
+ *
+ * <li><a name="redirect-output">a destination for <i>standard output</i>
+ * and <i>standard error</i></a>.  By default, the subprocess writes standard
+ * output and standard error to pipes.  Java code can access these pipes
+ * via the input streams returned by {@link Process#getInputStream()} and
+ * {@link Process#getErrorStream()}.  However, standard output and
+ * standard error may be redirected to other destinations using
+ * {@link #redirectOutput(ProcessBuilderForWin32.Redirect) redirectOutput} and
+ * {@link #redirectError(ProcessBuilderForWin32.Redirect) redirectError}.
+ * In this case, {@link Process#getInputStream()} and/or
+ * {@link Process#getErrorStream()} will return a <i>null input
+ * stream</i>, for which:
+ *
+ * <ul>
+ * <li>the {@link InputStream#read() read} methods always return
+ * {@code -1}
+ * <li>the {@link InputStream#available() available} method always returns
+ * {@code 0}
+ * <li>the {@link InputStream#close() close} method does nothing
+ * </ul>
+ *
+ * <li>a <i>redirectErrorStream</i> property.  Initially, this property
+ * is {@code false}, meaning that the standard output and error
+ * output of a subprocess are sent to two separate streams, which can
+ * be accessed using the {@link Process#getInputStream()} and {@link
+ * Process#getErrorStream()} methods.
+ *
+ * <p>If the value is set to {@code true}, then:
+ *
+ * <ul>
+ * <li>standard error is merged with the standard output and always sent
+ * to the same destination (this makes it easier to correlate error
+ * messages with the corresponding output)
+ * <li>the common destination of standard error and standard output can be
+ * redirected using
+ * {@link #redirectOutput(ProcessBuilderForWin32.Redirect) redirectOutput}
+ * <li>any redirection set by the
+ * {@link #redirectError(ProcessBuilderForWin32.Redirect) redirectError}
+ * method is ignored when creating a subprocess
+ * <li>the stream returned from {@link Process#getErrorStream()} will
+ * always be a <a href="#redirect-output">null input stream</a>
+ * </ul>
+ *
+ * </ul>
+ *
+ * <p>Modifying a process builder's attributes will affect processes
+ * subsequently started by that object's {@link #start()} method, but
+ * will never affect previously started processes or the Java process
+ * itself.
+ *
+ * <p>Most error checking is performed by the {@link #start()} method.
+ * It is possible to modify the state of an object so that {@link
+ * #start()} will fail.  For example, setting the command attribute to
+ * an empty list will not throw an exception unless {@link #start()}
+ * is invoked.
+ *
+ * <p><strong>Note that this class is not synchronized.</strong>
+ * If multiple threads access a {@code ProcessBuilderForWindows} instance
+ * concurrently, and at least one of the threads modifies one of the
+ * attributes structurally, it <i>must</i> be synchronized externally.
+ *
+ * <p>Starting a new process which uses the default working directory
+ * and environment is easy:
+ *
+ * <pre> {@code
+ * Process p = new ProcessBuilderForWindows("myCommand", "myArg").start();
+ * }</pre>
+ *
+ * <p>Here is an example that starts a process with a modified working
+ * directory and environment, and redirects standard output and error
+ * to be appended to a log file:
+ *
+ * <pre> {@code
+ * ProcessBuilderForWindows pb =
+ *   new ProcessBuilderForWindows("myCommand", "myArg1", "myArg2");
+ * Map<String, String> env = pb.environment();
+ * env.put("VAR1", "myValue");
+ * env.remove("OTHERVAR");
+ * env.put("VAR2", env.get("VAR1") + "suffix");
+ * pb.directory(new File("myDir"));
+ * File log = new File("log");
+ * pb.redirectErrorStream(true);
+ * pb.redirectOutput(Redirect.appendTo(log));
+ * Process p = pb.start();
+ * assert pb.redirectInput() == Redirect.PIPE;
+ * assert pb.redirectOutput().file() == log;
+ * assert p.getInputStream().read() == -1;
+ * }</pre>
+ *
+ * <p>To start a process with an explicit set of environment
+ * variables, first call {@link Map#clear() Map.clear()}
+ * before adding environment variables.
+ *
+ * @author Martin Buchholz
+ * @since 1.5
+ */
+
+public class ProcessBuilderForWin32 {
+
+    private String username;
+    private String password;
+    private List<String> command;
+    private File directory;
+    private Map<String,String> environment;
+    private boolean redirectErrorStream;
+    private ProcessBuilderForWin32.Redirect[] redirects;
+
+    /**
+     * Constructs a process builder with the specified operating
+     * system program and arguments.  This constructor does <i>not</i>
+     * make a copy of the {@code command} list.  Subsequent
+     * updates to the list will be reflected in the state of the
+     * process builder.  It is not checked whether
+     * {@code command} corresponds to a valid operating system
+     * command.
+     *
+     * @param  command the list containing the program and its arguments
+     * @throws NullPointerException if the argument is null
+     */
+    public ProcessBuilderForWin32(List<String> command) {
+        if (command == null)
+            throw new NullPointerException();
+        this.command = command;
+    }
+
+    /**
+     * Constructs a process builder with the specified operating
+     * system program and arguments.  This is a convenience
+     * constructor that sets the process builder's command to a string
+     * list containing the same strings as the {@code command}
+     * array, in the same order.  It is not checked whether
+     * {@code command} corresponds to a valid operating system
+     * command.
+     *
+     * @param command a string array containing the program and its arguments
+     */
+    public ProcessBuilderForWin32(String... command) {
+        this.command = new ArrayList<>(command.length);
+        for (String arg : command)
+            this.command.add(arg);
+    }
+
+    /**
+     * set username and password for process
+     *
+     * @param username username
+     * @param password password
+     * @return this process builder
+     */
+    public ProcessBuilderForWin32 user(String username, String password) {
+        this.username = username;
+        this.password = password;
+        return this;
+    }
+
+    /**
+     * Sets this process builder's operating system program and
+     * arguments.  This method does <i>not</i> make a copy of the
+     * {@code command} list.  Subsequent updates to the list will
+     * be reflected in the state of the process builder.  It is not
+     * checked whether {@code command} corresponds to a valid
+     * operating system command.
+     *
+     * @param  command the list containing the program and its arguments
+     * @return this process builder
+     *
+     * @throws NullPointerException if the argument is null
+     */
+    public ProcessBuilderForWin32 command(List<String> command) {
+        if (command == null)
+            throw new NullPointerException();
+        this.command = command;
+        return this;
+    }
+
+    /**
+     * Sets this process builder's operating system program and
+     * arguments.  This is a convenience method that sets the command
+     * to a string list containing the same strings as the
+     * {@code command} array, in the same order.  It is not
+     * checked whether {@code command} corresponds to a valid
+     * operating system command.
+     *
+     * @param  command a string array containing the program and its arguments
+     * @return this process builder
+     */
+    public ProcessBuilderForWin32 command(String... command) {
+        this.command = new ArrayList<>(command.length);
+        for (String arg : command)
+            this.command.add(arg);
+        return this;
+    }
+
+    /**
+     * Returns this process builder's operating system program and
+     * arguments.  The returned list is <i>not</i> a copy.  Subsequent
+     * updates to the list will be reflected in the state of this
+     * process builder.
+     *
+     * @return this process builder's program and its arguments
+     */
+    public List<String> command() {
+        return command;
+    }
+
+    /**
+     * Returns a string map view of this process builder's environment.
+     *
+     * Whenever a process builder is created, the environment is
+     * initialized to a copy of the current process environment (see
+     * {@link System#getenv()}).  Subprocesses subsequently started by
+     * this object's {@link #start()} method will use this map as
+     * their environment.
+     *
+     * <p>The returned object may be modified using ordinary {@link
+     * Map Map} operations.  These modifications will be
+     * visible to subprocesses started via the {@link #start()}
+     * method.  Two {@code ProcessBuilderForWindows} instances always
+     * contain independent process environments, so changes to the
+     * returned map will never be reflected in any other
+     * {@code ProcessBuilderForWindows} instance or the values returned by
+     * {@link System#getenv System.getenv}.
+     *
+     * <p>If the system does not support environment variables, an
+     * empty map is returned.
+     *
+     * <p>The returned map does not permit null keys or values.
+     * Attempting to insert or query the presence of a null key or
+     * value will throw a {@link NullPointerException}.
+     * Attempting to query the presence of a key or value which is not
+     * of type {@link String} will throw a {@link ClassCastException}.
+     *
+     * <p>The behavior of the returned map is system-dependent.  A
+     * system may not allow modifications to environment variables or
+     * may forbid certain variable names or values.  For this reason,
+     * attempts to modify the map may fail with
+     * {@link UnsupportedOperationException} or
+     * {@link IllegalArgumentException}
+     * if the modification is not permitted by the operating system.
+     *
+     * <p>Since the external format of environment variable names and
+     * values is system-dependent, there may not be a one-to-one
+     * mapping between them and Java's Unicode strings.  Nevertheless,
+     * the map is implemented in such a way that environment variables
+     * which are not modified by Java code will have an unmodified
+     * native representation in the subprocess.
+     *
+     * <p>The returned map and its collection views may not obey the
+     * general contract of the {@link Object#equals} and
+     * {@link Object#hashCode} methods.
+     *
+     * <p>The returned map is typically case-sensitive on all platforms.
+     *
+     * <p>If a security manager exists, its
+     * {@link SecurityManager#checkPermission checkPermission} method
+     * is called with a
+     * {@link RuntimePermission}{@code ("getenv.*")} permission.
+     * This may result in a {@link SecurityException} being thrown.
+     *
+     * <p>When passing information to a Java subprocess,
+     * <a href=System.html#EnvironmentVSSystemProperties>system properties</a>
+     * are generally preferred over environment variables.
+     *
+     * @return this process builder's environment
+     *
+     * @throws SecurityException
+     *         if a security manager exists and its
+     *         {@link SecurityManager#checkPermission checkPermission}
+     *         method doesn't allow access to the process environment
+     *
+     * @see    Runtime#exec(String[],String[], File)
+     * @see    System#getenv()
+     */
+    public Map<String,String> environment() {
+        SecurityManager security = System.getSecurityManager();
+        if (security != null)
+            security.checkPermission(new RuntimePermission("getenv.*"));
+
+        if (environment == null)
+            environment = ProcessEnvironmentForWin32.environment();
+
+        assert environment != null;
+
+        return environment;
+    }
+
+    // Only for use by Runtime.exec(...envp...)
+    ProcessBuilderForWin32 environment(String[] envp) {
+        assert environment == null;
+        if (envp != null) {
+            environment = ProcessEnvironmentForWin32.emptyEnvironment(envp.length);
+            assert environment != null;
+
+            for (String envstring : envp) {
+                // Before 1.5, we blindly passed invalid envstrings
+                // to the child process.
+                // We would like to throw an exception, but do not,
+                // for compatibility with old broken code.
+
+                // Silently discard any trailing junk.
+                if (envstring.indexOf((int) '\u0000') != -1)
+                    envstring = envstring.replaceFirst("\u0000.*", "");
+
+                int eqlsign =
+                        envstring.indexOf('=', ProcessEnvironmentForWin32.MIN_NAME_LENGTH);
+                // Silently ignore envstrings lacking the required `='.
+                if (eqlsign != -1)
+                    environment.put(envstring.substring(0,eqlsign),
+                            envstring.substring(eqlsign+1));
+            }
+        }
+        return this;
+    }
+
+    /**
+     * Returns this process builder's working directory.
+     *
+     * Subprocesses subsequently started by this object's {@link
+     * #start()} method will use this as their working directory.
+     * The returned value may be {@code null} -- this means to use
+     * the working directory of the current Java process, usually the
+     * directory named by the system property {@code user.dir},
+     * as the working directory of the child process.
+     *
+     * @return this process builder's working directory
+     */
+    public File directory() {
+        return directory;
+    }
+
+    /**
+     * Sets this process builder's working directory.
+     *
+     * Subprocesses subsequently started by this object's {@link
+     * #start()} method will use this as their working directory.
+     * The argument may be {@code null} -- this means to use the
+     * working directory of the current Java process, usually the
+     * directory named by the system property {@code user.dir},
+     * as the working directory of the child process.
+     *
+     * @param  directory the new working directory
+     * @return this process builder
+     */
+    public ProcessBuilderForWin32 directory(File directory) {
+        this.directory = directory;
+        return this;
+    }
+
+    // ---------------- I/O Redirection ----------------
+
+    /**
+     * Implements a <a href="#redirect-output">null input stream</a>.
+     */
+    static class NullInputStream extends InputStream {
+        static final ProcessBuilderForWin32.NullInputStream INSTANCE = new ProcessBuilderForWin32.NullInputStream();
+        private NullInputStream() {}
+        public int read()      { return -1; }
+        public int available() { return 0; }
+    }
+
+    /**
+     * Implements a <a href="#redirect-input">null output stream</a>.
+     */
+    static class NullOutputStream extends OutputStream {
+        static final ProcessBuilderForWin32.NullOutputStream INSTANCE = new ProcessBuilderForWin32.NullOutputStream();
+        private NullOutputStream() {}
+        public void write(int b) throws IOException {
+            throw new IOException("Stream closed");
+        }
+    }
+
+    /**
+     * Represents a source of subprocess input or a destination of
+     * subprocess output.
+     *
+     * Each {@code Redirect} instance is one of the following:
+     *
+     * <ul>
+     * <li>the special value {@link #PIPE Redirect.PIPE}
+     * <li>the special value {@link #INHERIT Redirect.INHERIT}
+     * <li>a redirection to read from a file, created by an invocation of
+     *     {@link ProcessBuilderForWin32.Redirect#from Redirect.from(File)}
+     * <li>a redirection to write to a file,  created by an invocation of
+     *     {@link ProcessBuilderForWin32.Redirect#to Redirect.to(File)}
+     * <li>a redirection to append to a file, created by an invocation of
+     *     {@link ProcessBuilderForWin32.Redirect#appendTo Redirect.appendTo(File)}
+     * </ul>
+     *
+     * <p>Each of the above categories has an associated unique
+     * {@link ProcessBuilderForWin32.Redirect.Type Type}.
+     *
+     * @since 1.7
+     */
+    public static abstract class Redirect {
+        /**
+         * The type of a {@link ProcessBuilderForWin32.Redirect}.
+         */
+        public enum Type {
+            /**
+             * The type of {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}.
+             */
+            PIPE,
+
+            /**
+             * The type of {@link ProcessBuilderForWin32.Redirect#INHERIT Redirect.INHERIT}.
+             */
+            INHERIT,
+
+            /**
+             * The type of redirects returned from
+             * {@link ProcessBuilderForWin32.Redirect#from Redirect.from(File)}.
+             */
+            READ,
+
+            /**
+             * The type of redirects returned from
+             * {@link ProcessBuilderForWin32.Redirect#to Redirect.to(File)}.
+             */
+            WRITE,
+
+            /**
+             * The type of redirects returned from
+             * {@link ProcessBuilderForWin32.Redirect#appendTo Redirect.appendTo(File)}.
+             */
+            APPEND
+        };
+
+        /**
+         * Returns the type of this {@code Redirect}.
+         * @return the type of this {@code Redirect}
+         */
+        public abstract ProcessBuilderForWin32.Redirect.Type type();
+
+        /**
+         * Indicates that subprocess I/O will be connected to the
+         * current Java process over a pipe.
+         *
+         * This is the default handling of subprocess standard I/O.
+         *
+         * <p>It will always be true that
+         *  <pre> {@code
+         * Redirect.PIPE.file() == null &&
+         * Redirect.PIPE.type() == Redirect.Type.PIPE
+         * }</pre>
+         */
+        public static final ProcessBuilderForWin32.Redirect PIPE = new ProcessBuilderForWin32.Redirect() {
+            public Type type() { return Type.PIPE; }
+            public String toString() { return type().toString(); }};
+
+        /**
+         * Indicates that subprocess I/O source or destination will be the
+         * same as those of the current process.  This is the normal
+         * behavior of most operating system command interpreters (shells).
+         *
+         * <p>It will always be true that
+         *  <pre> {@code
+         * Redirect.INHERIT.file() == null &&
+         * Redirect.INHERIT.type() == Redirect.Type.INHERIT
+         * }</pre>
+         */
+        public static final ProcessBuilderForWin32.Redirect INHERIT = new ProcessBuilderForWin32.Redirect() {
+            public Type type() { return Type.INHERIT; }
+            public String toString() { return type().toString(); }};
+
+        /**
+         * Returns the {@link File} source or destination associated
+         * with this redirect, or {@code null} if there is no such file.
+         *
+         * @return the file associated with this redirect,
+         *         or {@code null} if there is no such file
+         */
+        public File file() { return null; }
+
+        /**
+         * When redirected to a destination file, indicates if the output
+         * is to be written to the end of the file.
+         */
+        boolean append() {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Returns a redirect to read from the specified file.
+         *
+         * <p>It will always be true that
+         *  <pre> {@code
+         * Redirect.from(file).file() == file &&
+         * Redirect.from(file).type() == Redirect.Type.READ
+         * }</pre>
+         *
+         * @param file The {@code File} for the {@code Redirect}.
+         * @throws NullPointerException if the specified file is null
+         * @return a redirect to read from the specified file
+         */
+        public static ProcessBuilderForWin32.Redirect from(final File file) {
+            if (file == null)
+                throw new NullPointerException();
+            return new ProcessBuilderForWin32.Redirect() {
+                public Type type() { return Type.READ; }
+                public File file() { return file; }
+                public String toString() {
+                    return "redirect to read from file \"" + file + "\"";
+                }
+            };
+        }
+
+        /**
+         * Returns a redirect to write to the specified file.
+         * If the specified file exists when the subprocess is started,
+         * its previous contents will be discarded.
+         *
+         * <p>It will always be true that
+         *  <pre> {@code
+         * Redirect.to(file).file() == file &&
+         * Redirect.to(file).type() == Redirect.Type.WRITE
+         * }</pre>
+         *
+         * @param file The {@code File} for the {@code Redirect}.
+         * @throws NullPointerException if the specified file is null
+         * @return a redirect to write to the specified file
+         */
+        public static ProcessBuilderForWin32.Redirect to(final File file) {
+            if (file == null)
+                throw new NullPointerException();
+            return new ProcessBuilderForWin32.Redirect() {
+                public Type type() { return Type.WRITE; }
+                public File file() { return file; }
+                public String toString() {
+                    return "redirect to write to file \"" + file + "\"";
+                }
+                boolean append() { return false; }
+            };
+        }
+
+        /**
+         * Returns a redirect to append to the specified file.
+         * Each write operation first advances the position to the
+         * end of the file and then writes the requested data.
+         * Whether the advancement of the position and the writing
+         * of the data are done in a single atomic operation is
+         * system-dependent and therefore unspecified.
+         *
+         * <p>It will always be true that
+         *  <pre> {@code
+         * Redirect.appendTo(file).file() == file &&
+         * Redirect.appendTo(file).type() == Redirect.Type.APPEND
+         * }</pre>
+         *
+         * @param file The {@code File} for the {@code Redirect}.
+         * @throws NullPointerException if the specified file is null
+         * @return a redirect to append to the specified file
+         */
+        public static ProcessBuilderForWin32.Redirect appendTo(final File file) {
+            if (file == null)
+                throw new NullPointerException();
+            return new ProcessBuilderForWin32.Redirect() {
+                public Type type() { return Type.APPEND; }
+                public File file() { return file; }
+                public String toString() {
+                    return "redirect to append to file \"" + file + "\"";
+                }
+                boolean append() { return true; }
+            };
+        }
+
+        /**
+         * Compares the specified object with this {@code Redirect} for
+         * equality.  Returns {@code true} if and only if the two
+         * objects are identical or both objects are {@code Redirect}
+         * instances of the same type associated with non-null equal
+         * {@code File} instances.
+         */
+        public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+            if (! (obj instanceof ProcessBuilderForWin32.Redirect))
+                return false;
+            ProcessBuilderForWin32.Redirect r = (ProcessBuilderForWin32.Redirect) obj;
+            if (r.type() != this.type())
+                return false;
+            assert this.file() != null;
+            return this.file().equals(r.file());
+        }
+
+        /**
+         * Returns a hash code value for this {@code Redirect}.
+         * @return a hash code value for this {@code Redirect}
+         */
+        public int hashCode() {
+            File file = file();
+            if (file == null)
+                return super.hashCode();
+            else
+                return file.hashCode();
+        }
+
+        /**
+         * No public constructors.  Clients must use predefined
+         * static {@code Redirect} instances or factory methods.
+         */
+        private Redirect() {}
+    }
+
+    private ProcessBuilderForWin32.Redirect[] redirects() {
+        if (redirects == null)
+            redirects = new ProcessBuilderForWin32.Redirect[] {
+                    ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE, ProcessBuilderForWin32.Redirect.PIPE
+            };
+        return redirects;
+    }
+
+    /**
+     * Sets this process builder's standard input source.
+     *
+     * Subprocesses subsequently started by this object's {@link #start()}
+     * method obtain their standard input from this source.
+     *
+     * <p>If the source is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}
+     * (the initial value), then the standard input of a
+     * subprocess can be written to using the output stream
+     * returned by {@link Process#getOutputStream()}.
+     * If the source is set to any other value, then
+     * {@link Process#getOutputStream()} will return a
+     * <a href="#redirect-input">null output stream</a>.
+     *
+     * @param  source the new standard input source
+     * @return this process builder
+     * @throws IllegalArgumentException
+     *         if the redirect does not correspond to a valid source
+     *         of data, that is, has type
+     *         {@link ProcessBuilderForWin32.Redirect.Type#WRITE WRITE} or
+     *         {@link ProcessBuilderForWin32.Redirect.Type#APPEND APPEND}
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 redirectInput(ProcessBuilderForWin32.Redirect source) {
+        if (source.type() == ProcessBuilderForWin32.Redirect.Type.WRITE ||
+                source.type() == ProcessBuilderForWin32.Redirect.Type.APPEND)
+            throw new IllegalArgumentException(
+                    "Redirect invalid for reading: " + source);
+        redirects()[0] = source;
+        return this;
+    }
+
+    /**
+     * Sets this process builder's standard output destination.
+     *
+     * Subprocesses subsequently started by this object's {@link #start()}
+     * method send their standard output to this destination.
+     *
+     * <p>If the destination is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}
+     * (the initial value), then the standard output of a subprocess
+     * can be read using the input stream returned by {@link
+     * Process#getInputStream()}.
+     * If the destination is set to any other value, then
+     * {@link Process#getInputStream()} will return a
+     * <a href="#redirect-output">null input stream</a>.
+     *
+     * @param  destination the new standard output destination
+     * @return this process builder
+     * @throws IllegalArgumentException
+     *         if the redirect does not correspond to a valid
+     *         destination of data, that is, has type
+     *         {@link ProcessBuilderForWin32.Redirect.Type#READ READ}
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 redirectOutput(ProcessBuilderForWin32.Redirect destination) {
+        if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ)
+            throw new IllegalArgumentException(
+                    "Redirect invalid for writing: " + destination);
+        redirects()[1] = destination;
+        return this;
+    }
+
+    /**
+     * Sets this process builder's standard error destination.
+     *
+     * Subprocesses subsequently started by this object's {@link #start()}
+     * method send their standard error to this destination.
+     *
+     * <p>If the destination is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}
+     * (the initial value), then the error output of a subprocess
+     * can be read using the input stream returned by {@link
+     * Process#getErrorStream()}.
+     * If the destination is set to any other value, then
+     * {@link Process#getErrorStream()} will return a
+     * <a href="#redirect-output">null input stream</a>.
+     *
+     * <p>If the {@link #redirectErrorStream redirectErrorStream}
+     * attribute has been set {@code true}, then the redirection set
+     * by this method has no effect.
+     *
+     * @param  destination the new standard error destination
+     * @return this process builder
+     * @throws IllegalArgumentException
+     *         if the redirect does not correspond to a valid
+     *         destination of data, that is, has type
+     *         {@link ProcessBuilderForWin32.Redirect.Type#READ READ}
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 redirectError(ProcessBuilderForWin32.Redirect destination) {
+        if (destination.type() == ProcessBuilderForWin32.Redirect.Type.READ)
+            throw new IllegalArgumentException(
+                    "Redirect invalid for writing: " + destination);
+        redirects()[2] = destination;
+        return this;
+    }
+
+    /**
+     * Sets this process builder's standard input source to a file.
+     *
+     * <p>This is a convenience method.  An invocation of the form
+     * {@code redirectInput(file)}
+     * behaves in exactly the same way as the invocation
+     * {@link #redirectInput(ProcessBuilderForWin32.Redirect) redirectInput}
+     * {@code (Redirect.from(file))}.
+     *
+     * @param  file the new standard input source
+     * @return this process builder
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 redirectInput(File file) {
+        return redirectInput(ProcessBuilderForWin32.Redirect.from(file));
+    }
+
+    /**
+     * Sets this process builder's standard output destination to a file.
+     *
+     * <p>This is a convenience method.  An invocation of the form
+     * {@code redirectOutput(file)}
+     * behaves in exactly the same way as the invocation
+     * {@link #redirectOutput(ProcessBuilderForWin32.Redirect) redirectOutput}
+     * {@code (Redirect.to(file))}.
+     *
+     * @param  file the new standard output destination
+     * @return this process builder
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 redirectOutput(File file) {
+        return redirectOutput(ProcessBuilderForWin32.Redirect.to(file));
+    }
+
+    /**
+     * Sets this process builder's standard error destination to a file.
+     *
+     * <p>This is a convenience method.  An invocation of the form
+     * {@code redirectError(file)}
+     * behaves in exactly the same way as the invocation
+     * {@link #redirectError(ProcessBuilderForWin32.Redirect) redirectError}
+     * {@code (Redirect.to(file))}.
+     *
+     * @param  file the new standard error destination
+     * @return this process builder
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 redirectError(File file) {
+        return redirectError(ProcessBuilderForWin32.Redirect.to(file));
+    }
+
+    /**
+     * Returns this process builder's standard input source.
+     *
+     * Subprocesses subsequently started by this object's {@link #start()}
+     * method obtain their standard input from this source.
+     * The initial value is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}.
+     *
+     * @return this process builder's standard input source
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32.Redirect redirectInput() {
+        return (redirects == null) ? ProcessBuilderForWin32.Redirect.PIPE : redirects[0];
+    }
+
+    /**
+     * Returns this process builder's standard output destination.
+     *
+     * Subprocesses subsequently started by this object's {@link #start()}
+     * method redirect their standard output to this destination.
+     * The initial value is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}.
+     *
+     * @return this process builder's standard output destination
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32.Redirect redirectOutput() {
+        return (redirects == null) ? ProcessBuilderForWin32.Redirect.PIPE : redirects[1];
+    }
+
+    /**
+     * Returns this process builder's standard error destination.
+     *
+     * Subprocesses subsequently started by this object's {@link #start()}
+     * method redirect their standard error to this destination.
+     * The initial value is {@link ProcessBuilderForWin32.Redirect#PIPE Redirect.PIPE}.
+     *
+     * @return this process builder's standard error destination
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32.Redirect redirectError() {
+        return (redirects == null) ? ProcessBuilderForWin32.Redirect.PIPE : redirects[2];
+    }
+
+    /**
+     * Sets the source and destination for subprocess standard I/O
+     * to be the same as those of the current Java process.
+     *
+     * <p>This is a convenience method.  An invocation of the form
+     *  <pre> {@code
+     * pb.inheritIO()
+     * }</pre>
+     * behaves in exactly the same way as the invocation
+     *  <pre> {@code
+     * pb.redirectInput(Redirect.INHERIT)
+     *   .redirectOutput(Redirect.INHERIT)
+     *   .redirectError(Redirect.INHERIT)
+     * }</pre>
+     *
+     * This gives behavior equivalent to most operating system
+     * command interpreters, or the standard C library function
+     * {@code system()}.
+     *
+     * @return this process builder
+     * @since  1.7
+     */
+    public ProcessBuilderForWin32 inheritIO() {
+        Arrays.fill(redirects(), ProcessBuilderForWin32.Redirect.INHERIT);
+        return this;
+    }
+
+    /**
+     * Tells whether this process builder merges standard error and
+     * standard output.
+     *
+     * <p>If this property is {@code true}, then any error output
+     * generated by subprocesses subsequently started by this object's
+     * {@link #start()} method will be merged with the standard
+     * output, so that both can be read using the
+     * {@link Process#getInputStream()} method.  This makes it easier
+     * to correlate error messages with the corresponding output.
+     * The initial value is {@code false}.
+     *
+     * @return this process builder's {@code redirectErrorStream} property
+     */
+    public boolean redirectErrorStream() {
+        return redirectErrorStream;
+    }
+
+    /**
+     * Sets this process builder's {@code redirectErrorStream} property.
+     *
+     * <p>If this property is {@code true}, then any error output
+     * generated by subprocesses subsequently started by this object's
+     * {@link #start()} method will be merged with the standard
+     * output, so that both can be read using the
+     * {@link Process#getInputStream()} method.  This makes it easier
+     * to correlate error messages with the corresponding output.
+     * The initial value is {@code false}.
+     *
+     * @param  redirectErrorStream the new property value
+     * @return this process builder
+     */
+    public ProcessBuilderForWin32 redirectErrorStream(boolean redirectErrorStream) {
+        this.redirectErrorStream = redirectErrorStream;
+        return this;
+    }
+
+    /**
+     * Starts a new process using the attributes of this process builder.
+     *
+     * <p>The new process will
+     * invoke the command and arguments given by {@link #command()},
+     * in a working directory as given by {@link #directory()},
+     * with a process environment as given by {@link #environment()}.
+     *
+     * <p>This method checks that the command is a valid operating
+     * system command.  Which commands are valid is system-dependent,
+     * but at the very least the command must be a non-empty list of
+     * non-null strings.
+     *
+     * <p>A minimal set of system dependent environment variables may
+     * be required to start a process on some operating systems.
+     * As a result, the subprocess may inherit additional environment variable
+     * settings beyond those in the process builder's {@link #environment()}.
+     *
+     * <p>If there is a security manager, its
+     * {@link SecurityManager#checkExec checkExec}
+     * method is called with the first component of this object's
+     * {@code command} array as its argument. This may result in
+     * a {@link SecurityException} being thrown.
+     *
+     * <p>Starting an operating system process is highly system-dependent.
+     * Among the many things that can go wrong are:
+     * <ul>
+     * <li>The operating system program file was not found.
+     * <li>Access to the program file was denied.
+     * <li>The working directory does not exist.
+     * </ul>
+     *
+     * <p>In such cases an exception will be thrown.  The exact nature
+     * of the exception is system-dependent, but it will always be a
+     * subclass of {@link IOException}.
+     *
+     * <p>Subsequent modifications to this process builder will not
+     * affect the returned {@link Process}.
+     *
+     * @return a new {@link Process} object for managing the subprocess
+     *
+     * @throws NullPointerException
+     *         if an element of the command list is null
+     *
+     * @throws IndexOutOfBoundsException
+     *         if the command is an empty list (has size {@code 0})
+     *
+     * @throws SecurityException
+     *         if a security manager exists and
+     *         <ul>
+     *
+     *         <li>its
+     *         {@link SecurityManager#checkExec checkExec}
+     *         method doesn't allow creation of the subprocess, or
+     *
+     *         <li>the standard input to the subprocess was
+     *         {@linkplain #redirectInput redirected from a file}
+     *         and the security manager's
+     *         {@link SecurityManager#checkRead checkRead} method
+     *         denies read access to the file, or
+     *
+     *         <li>the standard output or standard error of the
+     *         subprocess was
+     *         {@linkplain #redirectOutput redirected to a file}
+     *         and the security manager's
+     *         {@link SecurityManager#checkWrite checkWrite} method
+     *         denies write access to the file
+     *
+     *         </ul>
+     *
+     * @throws IOException if an I/O error occurs
+     *
+     * @see Runtime#exec(String[], String[], File)
+     */
+    public Process start() throws IOException {
+        // Must convert to array first -- a malicious user-supplied
+        // list might try to circumvent the security check.
+        String[] cmdarray = command.toArray(new String[command.size()]);
+        cmdarray = cmdarray.clone();
+
+        for (String arg : cmdarray)
+            if (arg == null)
+                throw new NullPointerException();
+        // Throws IndexOutOfBoundsException if command is empty
+        String prog = cmdarray[0];
+
+        SecurityManager security = System.getSecurityManager();
+        if (security != null)
+            security.checkExec(prog);
+
+        String dir = directory == null ? null : directory.toString();
+
+        for (int i = 1; i < cmdarray.length; i++) {
+            if (cmdarray[i].indexOf('\u0000') >= 0) {
+                throw new IOException("invalid null character in command");
+            }
+        }
+
+        try {
+            return ProcessImplForWin32.start(
+                    username,
+                    password,
+                    cmdarray,
+                    environment,
+                    dir,
+                    redirects,
+                    redirectErrorStream);
+        } catch (IOException | IllegalArgumentException e) {
+            String exceptionInfo = ": " + e.getMessage();
+            Throwable cause = e;
+            if ((e instanceof IOException) && security != null) {
+                // Can not disclose the fail reason for read-protected files.
+                try {
+                    security.checkRead(prog);
+                } catch (SecurityException se) {
+                    exceptionInfo = "";
+                    cause = se;
+                }
+            }
+            // It's much easier for us to create a high-quality error
+            // message than the low-level C code which found the problem.
+            throw new IOException(
+                    "Cannot run program \"" + prog + "\""
+                            + (dir == null ? "" : " (in directory \"" + dir + "\")")
+                            + exceptionInfo,
+                    cause);
+        }
+    }
+
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java
new file mode 100644
index 0000000..3dbe7cb
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils.process;
+
+import com.sun.jna.platform.win32.Kernel32Util;
+
+import java.util.*;
+
+final class ProcessEnvironmentForWin32 extends HashMap<String,String> {
+
+    private static final long serialVersionUID = -8017839552603542824L;
+
+    private static String validateName(String name) {
+        // An initial `=' indicates a magic Windows variable name -- OK
+        if (name.indexOf('=', 1)   != -1 ||
+                name.indexOf('\u0000') != -1)
+            throw new IllegalArgumentException
+                    ("Invalid environment variable name: \"" + name + "\"");
+        return name;
+    }
+
+    private static String validateValue(String value) {
+        if (value.indexOf('\u0000') != -1)
+            throw new IllegalArgumentException
+                    ("Invalid environment variable value: \"" + value + "\"");
+        return value;
+    }
+
+    private static String nonNullString(Object o) {
+        if (o == null)
+            throw new NullPointerException();
+        return (String) o;
+    }
+
+    public String put(String key, String value) {
+        return super.put(validateName(key), validateValue(value));
+    }
+
+    public String get(Object key) {
+        return super.get(nonNullString(key));
+    }
+
+    public boolean containsKey(Object key) {
+        return super.containsKey(nonNullString(key));
+    }
+
+    public boolean containsValue(Object value) {
+        return super.containsValue(nonNullString(value));
+    }
+
+    public String remove(Object key) {
+        return super.remove(nonNullString(key));
+    }
+
+    private static class CheckedEntry implements Entry<String,String> {
+        private final Entry<String,String> e;
+        public CheckedEntry(Entry<String,String> e) {this.e = e;}
+        public String getKey()   { return e.getKey();}
+        public String getValue() { return e.getValue();}
+        public String setValue(String value) {
+            return e.setValue(validateValue(value));
+        }
+        public String toString() { return getKey() + "=" + getValue();}
+        public boolean equals(Object o) {return e.equals(o);}
+        public int hashCode()    {return e.hashCode();}
+    }
+
+    private static class CheckedEntrySet extends AbstractSet<Entry<String,String>> {
+        private final Set<Entry<String,String>> s;
+        public CheckedEntrySet(Set<Entry<String,String>> s) {this.s = s;}
+        public int size()        {return s.size();}
+        public boolean isEmpty() {return s.isEmpty();}
+        public void clear()      {       s.clear();}
+        public Iterator<Entry<String,String>> iterator() {
+            return new Iterator<Entry<String,String>>() {
+                Iterator<Entry<String,String>> i = s.iterator();
+                public boolean hasNext() { return i.hasNext();}
+                public Entry<String,String> next() {
+                    return new CheckedEntry(i.next());
+                }
+                public void remove() { i.remove();}
+            };
+        }
+        private static Entry<String,String> checkedEntry(Object o) {
+            @SuppressWarnings("unchecked")
+            Entry<String,String> e = (Entry<String,String>) o;
+            nonNullString(e.getKey());
+            nonNullString(e.getValue());
+            return e;
+        }
+        public boolean contains(Object o) {return s.contains(checkedEntry(o));}
+        public boolean remove(Object o)   {return s.remove(checkedEntry(o));}
+    }
+
+    private static class CheckedValues extends AbstractCollection<String> {
+        private final Collection<String> c;
+        public CheckedValues(Collection<String> c) {this.c = c;}
+        public int size()                  {return c.size();}
+        public boolean isEmpty()           {return c.isEmpty();}
+        public void clear()                {       c.clear();}
+        public Iterator<String> iterator() {return c.iterator();}
+        public boolean contains(Object o)  {return c.contains(nonNullString(o));}
+        public boolean remove(Object o)    {return c.remove(nonNullString(o));}
+    }
+
+    private static class CheckedKeySet extends AbstractSet<String> {
+        private final Set<String> s;
+        public CheckedKeySet(Set<String> s) {this.s = s;}
+        public int size()                  {return s.size();}
+        public boolean isEmpty()           {return s.isEmpty();}
+        public void clear()                {       s.clear();}
+        public Iterator<String> iterator() {return s.iterator();}
+        public boolean contains(Object o)  {return s.contains(nonNullString(o));}
+        public boolean remove(Object o)    {return s.remove(nonNullString(o));}
+    }
+
+    public Set<String> keySet() {
+        return new CheckedKeySet(super.keySet());
+    }
+
+    public Collection<String> values() {
+        return new CheckedValues(super.values());
+    }
+
+    public Set<Entry<String,String>> entrySet() {
+        return new CheckedEntrySet(super.entrySet());
+    }
+
+    private static final class NameComparator implements Comparator<String> {
+        public int compare(String s1, String s2) {
+            // We can't use String.compareToIgnoreCase since it
+            // canonicalizes to lower case, while Windows
+            // canonicalizes to upper case!  For example, "_" should
+            // sort *after* "Z", not before.
+            int n1 = s1.length();
+            int n2 = s2.length();
+            int min = Math.min(n1, n2);
+            for (int i = 0; i < min; i++) {
+                char c1 = s1.charAt(i);
+                char c2 = s2.charAt(i);
+                if (c1 != c2) {
+                    c1 = Character.toUpperCase(c1);
+                    c2 = Character.toUpperCase(c2);
+                    if (c1 != c2)
+                        // No overflow because of numeric promotion
+                        return c1 - c2;
+                }
+            }
+            return n1 - n2;
+        }
+    }
+
+    private static final class EntryComparator implements Comparator<Entry<String,String>> {
+        public int compare(Entry<String,String> e1,
+                           Entry<String,String> e2) {
+            return nameComparator.compare(e1.getKey(), e2.getKey());
+        }
+    }
+
+    // Allow `=' as first char in name, e.g. =C:=C:\DIR
+    static final int MIN_NAME_LENGTH = 1;
+
+    private static final NameComparator nameComparator;
+    private static final EntryComparator entryComparator;
+    private static final ProcessEnvironmentForWin32 theEnvironment;
+    private static final Map<String,String> theUnmodifiableEnvironment;
+    private static final Map<String,String> theCaseInsensitiveEnvironment;
+
+    static {
+        nameComparator  = new NameComparator();
+        entryComparator = new EntryComparator();
+        theEnvironment  = new ProcessEnvironmentForWin32();
+        theUnmodifiableEnvironment = Collections.unmodifiableMap(theEnvironment);
+
+        theEnvironment.putAll(environmentBlock());
+
+        theCaseInsensitiveEnvironment = new TreeMap<>(nameComparator);
+        theCaseInsensitiveEnvironment.putAll(theEnvironment);
+    }
+
+    private ProcessEnvironmentForWin32() {
+        super();
+    }
+
+    private ProcessEnvironmentForWin32(int capacity) {
+        super(capacity);
+    }
+
+    // Only for use by System.getenv(String)
+    static String getenv(String name) {
+        // The original implementation used a native call to _wgetenv,
+        // but it turns out that _wgetenv is only consistent with
+        // GetEnvironmentStringsW (for non-ASCII) if `wmain' is used
+        // instead of `main', even in a process created using
+        // CREATE_UNICODE_ENVIRONMENT.  Instead we perform the
+        // case-insensitive comparison ourselves.  At least this
+        // guarantees that System.getenv().get(String) will be
+        // consistent with System.getenv(String).
+        return theCaseInsensitiveEnvironment.get(name);
+    }
+
+    // Only for use by System.getenv()
+    static Map<String,String> getenv() {
+        return theUnmodifiableEnvironment;
+    }
+
+    // Only for use by ProcessBuilder.environment()
+    @SuppressWarnings("unchecked")
+    static Map<String,String> environment() {
+        return (Map<String,String>) theEnvironment.clone();
+    }
+
+    // Only for use by ProcessBuilder.environment(String[] envp)
+    static Map<String,String> emptyEnvironment(int capacity) {
+        return new ProcessEnvironmentForWin32(capacity);
+    }
+
+    private static Map<String, String> environmentBlock() {
+        return Kernel32Util.getEnvironmentVariables();
+    }
+
+    // Only for use by ProcessImpl.start()
+    String toEnvironmentBlock() {
+        // Sort Unicode-case-insensitively by name
+        List<Entry<String,String>> list = new ArrayList<>(entrySet());
+        Collections.sort(list, entryComparator);
+
+        StringBuilder sb = new StringBuilder(size()*30);
+        int cmp = -1;
+
+        // Some versions of MSVCRT.DLL require SystemRoot to be set.
+        // So, we make sure that it is always set, even if not provided
+        // by the caller.
+        final String SYSTEMROOT = "SystemRoot";
+
+        for (Entry<String,String> e : list) {
+            String key = e.getKey();
+            String value = e.getValue();
+            if (cmp < 0 && (cmp = nameComparator.compare(key, SYSTEMROOT)) > 0) {
+                // Not set, so add it here
+                addToEnvIfSet(sb, SYSTEMROOT);
+            }
+            addToEnv(sb, key, value);
+        }
+        if (cmp < 0) {
+            // Got to end of list and still not found
+            addToEnvIfSet(sb, SYSTEMROOT);
+        }
+        if (sb.length() == 0) {
+            // Environment was empty and SystemRoot not set in parent
+            sb.append('\u0000');
+        }
+        // Block is double NUL terminated
+        sb.append('\u0000');
+        return sb.toString();
+    }
+
+    // add the environment variable to the child, if it exists in parent
+    private static void addToEnvIfSet(StringBuilder sb, String name) {
+        String s = getenv(name);
+        if (s != null)
+            addToEnv(sb, name, s);
+    }
+
+    private static void addToEnv(StringBuilder sb, String name, String val) {
+        sb.append(name).append('=').append(val).append('\u0000');
+    }
+
+    static String toEnvironmentBlock(Map<String,String> map) {
+        return map == null ? null : ((ProcessEnvironmentForWin32)map).toEnvironmentBlock();
+    }
+}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java
new file mode 100644
index 0000000..4583be8
--- /dev/null
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils.process;
+
+import com.sun.jna.Pointer;
+import com.sun.jna.platform.win32.*;
+import com.sun.jna.ptr.IntByReference;
+import sun.security.action.GetPropertyAction;
+
+import java.io.*;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static com.sun.jna.platform.win32.WinBase.STILL_ACTIVE;
+
+public class ProcessImplForWin32 extends Process {
+    private static final sun.misc.JavaIOFileDescriptorAccess fdAccess
+            = sun.misc.SharedSecrets.getJavaIOFileDescriptorAccess();
+
+    private static final int PIPE_SIZE = 4096 + 24;
+
+    private static final int HANDLE_STORAGE_SIZE = 6;
+
+    private static final int OFFSET_READ = 0;
+
+    private static final int OFFSET_WRITE = 1;
+
+    private static final WinNT.HANDLE JAVA_INVALID_HANDLE_VALUE = new WinNT.HANDLE(Pointer.createConstant(-1));
+
+    /**
+     * Open a file for writing. If {@code append} is {@code true} then the file
+     * is opened for atomic append directly and a FileOutputStream constructed
+     * with the resulting handle. This is because a FileOutputStream created
+     * to append to a file does not open the file in a manner that guarantees
+     * that writes by the child process will be atomic.
+     */
+    private static FileOutputStream newFileOutputStream(File f, boolean append)
+            throws IOException
+    {
+        if (append) {
+            String path = f.getPath();
+            SecurityManager sm = System.getSecurityManager();
+            if (sm != null)
+                sm.checkWrite(path);
+            long handle = openForAtomicAppend(path);
+            final FileDescriptor fd = new FileDescriptor();
+            fdAccess.setHandle(fd, handle);
+            return AccessController.doPrivileged(
+                    new PrivilegedAction<FileOutputStream>() {
+                        public FileOutputStream run() {
+                            return new FileOutputStream(fd);
+                        }
+                    }
+            );
+        } else {
+            return new FileOutputStream(f);
+        }
+    }
+
+    // System-dependent portion of ProcessBuilderForWindows.start()
+    static Process start(String username,
+                         String password,
+                         String cmdarray[],
+                         java.util.Map<String,String> environment,
+                         String dir,
+                         ProcessBuilderForWin32.Redirect[] redirects,
+                         boolean redirectErrorStream)
+            throws IOException
+    {
+        String envblock = ProcessEnvironmentForWin32.toEnvironmentBlock(environment);
+
+        FileInputStream  f0 = null;
+        FileOutputStream f1 = null;
+        FileOutputStream f2 = null;
+
+        try {
+            long[] stdHandles;
+            if (redirects == null) {
+                stdHandles = new long[] { -1L, -1L, -1L };
+            } else {
+                stdHandles = new long[3];
+
+                if (redirects[0] == ProcessBuilderForWin32.Redirect.PIPE)
+                    stdHandles[0] = -1L;
+                else if (redirects[0] == ProcessBuilderForWin32.Redirect.INHERIT)
+                    stdHandles[0] = fdAccess.getHandle(FileDescriptor.in);
+                else {
+                    f0 = new FileInputStream(redirects[0].file());
+                    stdHandles[0] = fdAccess.getHandle(f0.getFD());
+                }
+
+                if (redirects[1] == ProcessBuilderForWin32.Redirect.PIPE)
+                    stdHandles[1] = -1L;
+                else if (redirects[1] == ProcessBuilderForWin32.Redirect.INHERIT)
+                    stdHandles[1] = fdAccess.getHandle(FileDescriptor.out);
+                else {
+                    f1 = newFileOutputStream(redirects[1].file(),
+                            redirects[1].append());
+                    stdHandles[1] = fdAccess.getHandle(f1.getFD());
+                }
+
+                if (redirects[2] == ProcessBuilderForWin32.Redirect.PIPE)
+                    stdHandles[2] = -1L;
+                else if (redirects[2] == ProcessBuilderForWin32.Redirect.INHERIT)
+                    stdHandles[2] = fdAccess.getHandle(FileDescriptor.err);
+                else {
+                    f2 = newFileOutputStream(redirects[2].file(),
+                            redirects[2].append());
+                    stdHandles[2] = fdAccess.getHandle(f2.getFD());
+                }
+            }
+
+            return new ProcessImplForWin32(username, password, cmdarray, envblock, dir, stdHandles, redirectErrorStream);
+        } finally {
+            // In theory, close() can throw IOException
+            // (although it is rather unlikely to happen here)
+            try { if (f0 != null) f0.close(); }
+            finally {
+                try { if (f1 != null) f1.close(); }
+                finally { if (f2 != null) f2.close(); }
+            }
+        }
+
+    }
+
+    private static class LazyPattern {
+        // Escape-support version:
+        //    "(\")((?:\\\\\\1|.)+?)\\1|([^\\s\"]+)";
+        private static final Pattern PATTERN =
+                Pattern.compile("[^\\s\"]+|\"[^\"]*\"");
+    };
+
+    /* Parses the command string parameter into the executable name and
+     * program arguments.
+     *
+     * The command string is broken into tokens. The token separator is a space
+     * or quota character. The space inside quotation is not a token separator.
+     * There are no escape sequences.
+     */
+    private static String[] getTokensFromCommand(String command) {
+        ArrayList<String> matchList = new ArrayList<>(8);
+        Matcher regexMatcher = ProcessImplForWin32.LazyPattern.PATTERN.matcher(command);
+        while (regexMatcher.find())
+            matchList.add(regexMatcher.group());
+        return matchList.toArray(new String[matchList.size()]);
+    }
+
+    private static final int VERIFICATION_CMD_BAT = 0;
+    private static final int VERIFICATION_WIN32 = 1;
+    private static final int VERIFICATION_WIN32_SAFE = 2; // inside quotes not allowed
+    private static final int VERIFICATION_LEGACY = 3;
+    // See Command shell overview for documentation of special characters.
+    // https://docs.microsoft.com/en-us/previous-versions/windows/it-pro/windows-xp/bb490954(v=technet.10)
+    private static final char ESCAPE_VERIFICATION[][] = {
+            // We guarantee the only command file execution for implicit [cmd.exe] run.
+            //    http://technet.microsoft.com/en-us/library/bb490954.aspx
+            {' ', '\t', '<', '>', '&', '|', '^'},
+            {' ', '\t', '<', '>'},
+            {' ', '\t', '<', '>'},
+            {' ', '\t'}
+    };
+
+    private static String createCommandLine(int verificationType,
+                                            final String executablePath,
+                                            final String cmd[])
+    {
+        StringBuilder cmdbuf = new StringBuilder(80);
+
+        cmdbuf.append(executablePath);
+
+        for (int i = 1; i < cmd.length; ++i) {
+            cmdbuf.append(' ');
+            String s = cmd[i];
+            if (needsEscaping(verificationType, s)) {
+                cmdbuf.append('"');
+
+                if (verificationType == VERIFICATION_WIN32_SAFE) {
+                    // Insert the argument, adding '\' to quote any interior quotes
+                    int length = s.length();
+                    for (int j = 0; j < length; j++) {
+                        char c = s.charAt(j);
+                        if (c == DOUBLEQUOTE) {
+                            int count = countLeadingBackslash(verificationType, s, j);
+                            while (count-- > 0) {
+                                cmdbuf.append(BACKSLASH);   // double the number of backslashes
+                            }
+                            cmdbuf.append(BACKSLASH);       // backslash to quote the quote
+                        }
+                        cmdbuf.append(c);
+                    }
+                } else {
+                    cmdbuf.append(s);
+                }
+                // The code protects the [java.exe] and console command line
+                // parser, that interprets the [\"] combination as an escape
+                // sequence for the ["] char.
+                //     http://msdn.microsoft.com/en-us/library/17w5ykft.aspx
+                //
+                // If the argument is an FS path, doubling of the tail [\]
+                // char is not a problem for non-console applications.
+                //
+                // The [\"] sequence is not an escape sequence for the [cmd.exe]
+                // command line parser. The case of the [""] tail escape
+                // sequence could not be realized due to the argument validation
+                // procedure.
+                int count = countLeadingBackslash(verificationType, s, s.length());
+                while (count-- > 0) {
+                    cmdbuf.append(BACKSLASH);   // double the number of backslashes
+                }
+                cmdbuf.append('"');
+            } else {
+                cmdbuf.append(s);
+            }
+        }
+        return cmdbuf.toString();
+    }
+
+    /**
+     * Return the argument without quotes (1st and last) if present, else the arg.
+     * @param str a string
+     * @return the string without 1st and last quotes
+     */
+    private static String unQuote(String str) {
+        int len = str.length();
+        return (len >= 2 && str.charAt(0) == DOUBLEQUOTE && str.charAt(len - 1) == DOUBLEQUOTE)
+                ? str.substring(1, len - 1)
+                : str;
+    }
+
+    private static boolean needsEscaping(int verificationType, String arg) {
+        // Switch off MS heuristic for internal ["].
+        // Please, use the explicit [cmd.exe] call
+        // if you need the internal ["].
+        //    Example: "cmd.exe", "/C", "Extended_MS_Syntax"
+
+        // For [.exe] or [.com] file the unpaired/internal ["]
+        // in the argument is not a problem.
+        String unquotedArg = unQuote(arg);
+        boolean argIsQuoted = !arg.equals(unquotedArg);
+        boolean embeddedQuote = unquotedArg.indexOf(DOUBLEQUOTE) >= 0;
+
+        switch (verificationType) {
+            case VERIFICATION_CMD_BAT:
+                if (embeddedQuote) {
+                    throw new IllegalArgumentException("Argument has embedded quote, " +
+                            "use the explicit CMD.EXE call.");
+                }
+                break;  // break determine whether to quote
+            case VERIFICATION_WIN32_SAFE:
+                if (argIsQuoted && embeddedQuote)  {
+                    throw new IllegalArgumentException("Malformed argument has embedded quote: "
+                            + unquotedArg);
+                }
+                break;
+            default:
+                break;
+        }
+
+        if (!argIsQuoted) {
+            char testEscape[] = ESCAPE_VERIFICATION[verificationType];
+            for (int i = 0; i < testEscape.length; ++i) {
+                if (arg.indexOf(testEscape[i]) >= 0) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private static String getExecutablePath(String path)
+            throws IOException
+    {
+        String name = unQuote(path);
+        if (name.indexOf(DOUBLEQUOTE) >= 0) {
+            throw new IllegalArgumentException("Executable name has embedded quote, " +
+                    "split the arguments: " + name);
+        }
+        // Win32 CreateProcess requires path to be normalized
+        File fileToRun = new File(name);
+
+        // From the [CreateProcess] function documentation:
+        //
+        // "If the file name does not contain an extension, .exe is appended.
+        // Therefore, if the file name extension is .com, this parameter
+        // must include the .com extension. If the file name ends in
+        // a period (.) with no extension, or if the file name contains a path,
+        // .exe is not appended."
+        //
+        // "If the file name !does not contain a directory path!,
+        // the system searches for the executable file in the following
+        // sequence:..."
+        //
+        // In practice ANY non-existent path is extended by [.exe] extension
+        // in the [CreateProcess] function with the only exception:
+        // the path ends by (.)
+
+        return fileToRun.getPath();
+    }
+
+    /**
+     * An executable is any program that is an EXE or does not have an extension
+     * and the Windows createProcess will be looking for .exe.
+     * The comparison is case insensitive based on the name.
+     * @param executablePath the executable file
+     * @return true if the path ends in .exe or does not have an extension.
+     */
+    private boolean isExe(String executablePath) {
+        File file = new File(executablePath);
+        String upName = file.getName().toUpperCase(Locale.ROOT);
+        return (upName.endsWith(".EXE") || upName.indexOf('.') < 0);
+    }
+
+    // Old version that can be bypassed
+    private boolean isShellFile(String executablePath) {
+        String upPath = executablePath.toUpperCase();
+        return (upPath.endsWith(".CMD") || upPath.endsWith(".BAT"));
+    }
+
+    private String quoteString(String arg) {
+        StringBuilder argbuf = new StringBuilder(arg.length() + 2);
+        return argbuf.append('"').append(arg).append('"').toString();
+    }
+
+    // Count backslashes before start index of string.
+    // .bat files don't include backslashes as part of the quote
+    private static int countLeadingBackslash(int verificationType,
+                                             CharSequence input, int start) {
+        if (verificationType == VERIFICATION_CMD_BAT)
+            return 0;
+        int j;
+        for (j = start - 1; j >= 0 && input.charAt(j) == BACKSLASH; j--) {
+            // just scanning backwards
+        }
+        return (start - 1) - j;  // number of BACKSLASHES
+    }
+
+    private static final char DOUBLEQUOTE = '\"';
+    private static final char BACKSLASH = '\\';
+
+    private WinNT.HANDLE handle;
+    private OutputStream stdin_stream;
+    private InputStream stdout_stream;
+    private InputStream stderr_stream;
+
+    private ProcessImplForWin32(
+            String username,
+            String password,
+            String cmd[],
+            final String envblock,
+            final String path,
+            final long[] stdHandles,
+            final boolean redirectErrorStream)
+            throws IOException
+    {
+        String cmdstr;
+        final SecurityManager security = System.getSecurityManager();
+        GetPropertyAction action = new GetPropertyAction("jdk.lang.Process.allowAmbiguousCommands",
+                (security == null) ? "true" : "false");
+        final boolean allowAmbiguousCommands = !"false".equalsIgnoreCase(action.run());
+        if (allowAmbiguousCommands && security == null) {
+            // Legacy mode.
+
+            // Normalize path if possible.
+            String executablePath = new File(cmd[0]).getPath();
+
+            // No worry about internal, unpaired ["], and redirection/piping.
+            if (needsEscaping(VERIFICATION_LEGACY, executablePath) )
+                executablePath = quoteString(executablePath);
+
+            cmdstr = createCommandLine(
+                    //legacy mode doesn't worry about extended verification
+                    VERIFICATION_LEGACY,
+                    executablePath,
+                    cmd);
+        } else {
+            String executablePath;
+            try {
+                executablePath = getExecutablePath(cmd[0]);
+            } catch (IllegalArgumentException e) {
+                // Workaround for the calls like
+                // Runtime.getRuntime().exec("\"C:\\Program Files\\foo\" bar")
+
+                // No chance to avoid CMD/BAT injection, except to do the work
+                // right from the beginning. Otherwise we have too many corner
+                // cases from
+                //    Runtime.getRuntime().exec(String[] cmd [, ...])
+                // calls with internal ["] and escape sequences.
+
+                // Restore original command line.
+                StringBuilder join = new StringBuilder();
+                // terminal space in command line is ok
+                for (String s : cmd)
+                    join.append(s).append(' ');
+
+                // Parse the command line again.
+                cmd = getTokensFromCommand(join.toString());
+                executablePath = getExecutablePath(cmd[0]);
+
+                // Check new executable name once more
+                if (security != null)
+                    security.checkExec(executablePath);
+            }
+
+            // Quotation protects from interpretation of the [path] argument as
+            // start of longer path with spaces. Quotation has no influence to
+            // [.exe] extension heuristic.
+            boolean isShell = allowAmbiguousCommands ? isShellFile(executablePath)
+                    : !isExe(executablePath);
+            cmdstr = createCommandLine(
+                    // We need the extended verification procedures
+                    isShell ? VERIFICATION_CMD_BAT
+                            : (allowAmbiguousCommands ? VERIFICATION_WIN32 : VERIFICATION_WIN32_SAFE),
+                    quoteString(executablePath),
+                    cmd);
+        }
+
+        handle = create(username, password, cmdstr, envblock, path, stdHandles, redirectErrorStream);
+
+        AccessController.doPrivileged(
+                new PrivilegedAction<Void>() {
+                    public Void run() {
+                        if (stdHandles[0] == -1L)
+                            stdin_stream = ProcessBuilderForWin32.NullOutputStream.INSTANCE;
+                        else {
+                            FileDescriptor stdin_fd = new FileDescriptor();
+                            fdAccess.setHandle(stdin_fd, stdHandles[0]);
+                            stdin_stream = new BufferedOutputStream(
+                                    new FileOutputStream(stdin_fd));
+                        }
+
+                        if (stdHandles[1] == -1L)
+                            stdout_stream = ProcessBuilderForWin32.NullInputStream.INSTANCE;
+                        else {
+                            FileDescriptor stdout_fd = new FileDescriptor();
+                            fdAccess.setHandle(stdout_fd, stdHandles[1]);
+                            stdout_stream = new BufferedInputStream(
+                                    new FileInputStream(stdout_fd));
+                        }
+
+                        if (stdHandles[2] == -1L)
+                            stderr_stream = ProcessBuilderForWin32.NullInputStream.INSTANCE;
+                        else {
+                            FileDescriptor stderr_fd = new FileDescriptor();
+                            fdAccess.setHandle(stderr_fd, stdHandles[2]);
+                            stderr_stream = new FileInputStream(stderr_fd);
+                        }
+
+                        return null; }});
+    }
+
+    public OutputStream getOutputStream() {
+        return stdin_stream;
+    }
+
+    public InputStream getInputStream() {
+        return stdout_stream;
+    }
+
+    public InputStream getErrorStream() {
+        return stderr_stream;
+    }
+
+    protected void finalize() {
+        closeHandle(handle);
+    }
+
+    public int exitValue() {
+        int exitCode = getExitCodeProcess(handle);
+        if (exitCode == STILL_ACTIVE)
+            throw new IllegalThreadStateException("process has not exited");
+        return exitCode;
+    }
+
+    public int waitFor() throws InterruptedException {
+        waitForInterruptibly(handle);
+        if (Thread.interrupted())
+            throw new InterruptedException();
+        return exitValue();
+    }
+
+    @Override
+    public boolean waitFor(long timeout, TimeUnit unit)
+            throws InterruptedException
+    {
+        if (getExitCodeProcess(handle) != STILL_ACTIVE) return true;
+        if (timeout <= 0) return false;
+
+        long remainingNanos  = unit.toNanos(timeout);
+        long deadline = System.nanoTime() + remainingNanos ;
+
+        do {
+            // Round up to next millisecond
+            long msTimeout = TimeUnit.NANOSECONDS.toMillis(remainingNanos + 999_999L);
+            waitForTimeoutInterruptibly(handle, msTimeout);
+            if (Thread.interrupted())
+                throw new InterruptedException();
+            if (getExitCodeProcess(handle) != STILL_ACTIVE) {
+                return true;
+            }
+            remainingNanos = deadline - System.nanoTime();
+        } while (remainingNanos > 0);
+
+        return (getExitCodeProcess(handle) != STILL_ACTIVE);
+    }
+
+    public void destroy() { terminateProcess(handle); }
+
+    public Process destroyForcibly() {
+        destroy();
+        return this;
+    }
+
+    public boolean isAlive() {
+        return isProcessAlive(handle);
+    }
+
+    private static boolean initHolder(WinNT.HANDLEByReference pjhandles,
+                                      WinNT.HANDLEByReference[] pipe,
+                                      int offset,
+                                      WinNT.HANDLEByReference phStd) {
+        if (!pjhandles.getValue().equals(JAVA_INVALID_HANDLE_VALUE)) {
+            phStd.setValue(pjhandles.getValue());
+            pjhandles.setValue(JAVA_INVALID_HANDLE_VALUE);
+        } else {
+            if (!Kernel32.INSTANCE.CreatePipe(pipe[0], pipe[1], null, PIPE_SIZE)) {
+                throw new Win32Exception(Kernel32.INSTANCE.GetLastError());
+            } else {
+                WinNT.HANDLE thisProcessEnd = offset == OFFSET_READ ? pipe[1].getValue() : pipe[0].getValue();
+                phStd.setValue(pipe[offset].getValue());
+                pjhandles.setValue(thisProcessEnd);
+            }
+        }
+        Kernel32.INSTANCE.SetHandleInformation(phStd.getValue(), Kernel32.HANDLE_FLAG_INHERIT, Kernel32.HANDLE_FLAG_INHERIT);
+        return true;
+    }
+
+    private static void releaseHolder(boolean complete, WinNT.HANDLEByReference[] pipe, int offset) {
+        closeHandle(pipe[offset].getValue());
+        if (complete) {
+            closeHandle(pipe[offset == OFFSET_READ ? OFFSET_WRITE : OFFSET_READ].getValue());
+        }
+    }
+
+    private static void prepareIOEHandleState(WinNT.HANDLE[] stdIOE, Boolean[] inherit) {
+        for(int i = 0; i < HANDLE_STORAGE_SIZE; ++i) {
+            WinNT.HANDLE hstd = stdIOE[i];
+            if (!Kernel32.INVALID_HANDLE_VALUE.equals(hstd)) {
+                inherit[i] = Boolean.TRUE;
+                Kernel32.INSTANCE.SetHandleInformation(hstd, Kernel32.HANDLE_FLAG_INHERIT, 0);
+            }
+        }
+    }
+
+    private static void restoreIOEHandleState(WinNT.HANDLE[] stdIOE, Boolean[] inherit) {
+        for (int i = HANDLE_STORAGE_SIZE - 1; i >= 0; --i) {
+            if (!Kernel32.INVALID_HANDLE_VALUE.equals(stdIOE[i])) {
+                Kernel32.INSTANCE.SetHandleInformation(stdIOE[i], Kernel32.HANDLE_FLAG_INHERIT, inherit[i] ? Kernel32.HANDLE_FLAG_INHERIT : 0);
+            }
+        }
+    }
+
+    private static WinNT.HANDLE processCreate(String username,
+                                              String password,
+                                              String cmd,
+                                              final String envblock,
+                                              final String path,
+                                              final WinNT.HANDLEByReference[] stdHandles,
+                                              final boolean redirectErrorStream) {
+        WinNT.HANDLE ret = new WinNT.HANDLE(Pointer.createConstant(0));
+
+        WinNT.HANDLE[] stdIOE = new WinNT.HANDLE[] {
+                Kernel32.INVALID_HANDLE_VALUE, Kernel32.INVALID_HANDLE_VALUE, Kernel32.INVALID_HANDLE_VALUE,
+                stdHandles[0].getValue(), stdHandles[1].getValue(), stdHandles[2].getValue()
+        };
+        stdIOE[0] = Kernel32.INSTANCE.GetStdHandle(Kernel32.STD_INPUT_HANDLE);
+        stdIOE[1] = Kernel32.INSTANCE.GetStdHandle(Kernel32.STD_OUTPUT_HANDLE);
+        stdIOE[2] = Kernel32.INSTANCE.GetStdHandle(Kernel32.STD_ERROR_HANDLE);
+
+        Boolean[] inherit = new Boolean[] {
+                Boolean.FALSE, Boolean.FALSE, Boolean.FALSE,
+                Boolean.FALSE, Boolean.FALSE, Boolean.FALSE
+        };
+
+        prepareIOEHandleState(stdIOE, inherit);
+
+        // input
+        WinNT.HANDLEByReference hStdInput = new WinNT.HANDLEByReference();
+        WinNT.HANDLEByReference[] pipeIn = new WinNT.HANDLEByReference[] {
+                new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE), new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE) };
+
+        // output
+        WinNT.HANDLEByReference hStdOutput = new WinNT.HANDLEByReference();
+        WinNT.HANDLEByReference[] pipeOut = new WinNT.HANDLEByReference[] {
+                new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE), new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE) };
+
+        // error
+        WinNT.HANDLEByReference hStdError = new WinNT.HANDLEByReference();
+        WinNT.HANDLEByReference[] pipeError = new WinNT.HANDLEByReference[] {
+                new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE), new WinNT.HANDLEByReference(Kernel32.INVALID_HANDLE_VALUE) };
+
+        boolean success;
+        if (initHolder(stdHandles[0], pipeIn, OFFSET_READ, hStdInput)) {
+            if (initHolder(stdHandles[1], pipeOut, OFFSET_WRITE, hStdOutput)) {
+                WinBase.STARTUPINFO si = new WinBase.STARTUPINFO();
+                si.hStdInput = hStdInput.getValue();
+                si.hStdOutput = hStdOutput.getValue();
+
+                if (redirectErrorStream) {
+                    si.hStdError = si.hStdOutput;
+                    stdHandles[2].setValue(JAVA_INVALID_HANDLE_VALUE);
+                    success = true;
+                } else {
+                    success = initHolder(stdHandles[2], pipeError, OFFSET_WRITE, hStdError);
+                    si.hStdError = hStdError.getValue();
+                }
+
+                if (success) {
+                    WTypes.LPSTR lpEnvironment = envblock == null ? new WTypes.LPSTR() : new WTypes.LPSTR(envblock);
+                    Kernel32.PROCESS_INFORMATION pi = new WinBase.PROCESS_INFORMATION();
+                    si.dwFlags = Kernel32.STARTF_USESTDHANDLES;
+                    if (!Advapi32.INSTANCE.CreateProcessWithLogonW(
+                            username
+                            , null
+                            , password
+                            , Advapi32.LOGON_WITH_PROFILE
+                            , null
+                            , cmd
+                            , Kernel32.CREATE_NO_WINDOW
+                            , lpEnvironment.getPointer()
+                            , path
+                            , si
+                            , pi)) {
+                        throw new Win32Exception(Kernel32.INSTANCE.GetLastError());
+                    } else {
+                        closeHandle(pi.hThread);
+                        ret = pi.hProcess;
+                    }
+                }
+                releaseHolder(ret.getPointer().equals(Pointer.createConstant(0)), pipeError, OFFSET_WRITE);
+                releaseHolder(ret.getPointer().equals(Pointer.createConstant(0)), pipeOut, OFFSET_WRITE);
+            }
+            releaseHolder(ret.getPointer().equals(Pointer.createConstant(0)), pipeIn, OFFSET_READ);
+        }
+        restoreIOEHandleState(stdIOE, inherit);
+        return ret;
+    }
+
+    private static synchronized WinNT.HANDLE create(String username,
+                                                    String password,
+                                                    String cmd,
+                                                    final String envblock,
+                                                    final String path,
+                                                    final long[] stdHandles,
+                                                    final boolean redirectErrorStream) {
+        WinNT.HANDLE ret = new WinNT.HANDLE(Pointer.createConstant(0));
+        WinNT.HANDLEByReference[] handles = new WinNT.HANDLEByReference[stdHandles.length];
+        for (int i = 0; i < stdHandles.length; i++) {
+            handles[i] = new WinNT.HANDLEByReference(new WinNT.HANDLE(Pointer.createConstant(stdHandles[i])));
+        }
+
+        if (cmd != null) {
+            if (username != null && password != null) {
+                ret = processCreate(username, password, cmd, envblock, path, handles, redirectErrorStream);
+            }
+        }
+
+        for (int i = 0; i < stdHandles.length; i++) {
+            stdHandles[i] = handles[i].getPointer().getLong(0);
+        }
+
+        return ret;
+    }
+
+    private static int getExitCodeProcess(WinNT.HANDLE handle) {
+        IntByReference exitStatus = new IntByReference();
+        if (!Kernel32.INSTANCE.GetExitCodeProcess(handle, exitStatus)) {
+            throw new Win32Exception(Kernel32.INSTANCE.GetLastError());
+        }
+        return exitStatus.getValue();
+    }
+
+    private static void terminateProcess(WinNT.HANDLE handle) {
+        Kernel32.INSTANCE.TerminateProcess(handle, 1);
+    }
+
+    private static boolean isProcessAlive(WinNT.HANDLE handle) {
+        IntByReference exitStatus = new IntByReference();
+        Kernel32.INSTANCE.GetExitCodeProcess(handle, exitStatus);
+        return exitStatus.getValue() == STILL_ACTIVE;
+    }
+
+    private static void closeHandle(WinNT.HANDLE handle) {
+        Kernel32Util.closeHandle(handle);
+    }
+
+    /**
+     * Opens a file for atomic append. The file is created if it doesn't
+     * already exist.
+     *
+     * @param path the file to open or create
+     * @return the native HANDLE
+     */
+    private static long openForAtomicAppend(String path) throws IOException {
+        int access = Kernel32.GENERIC_READ | Kernel32.GENERIC_WRITE;
+        int sharing = Kernel32.FILE_SHARE_READ | Kernel32.FILE_SHARE_WRITE;
+        int disposition = Kernel32.OPEN_ALWAYS;
+        int flagsAndAttributes = Kernel32.FILE_ATTRIBUTE_NORMAL;
+        if (path == null || path.isEmpty()) {
+            return -1;
+        } else {
+            WinNT.HANDLE handle = Kernel32.INSTANCE.CreateFile(path, access, sharing, null, disposition, flagsAndAttributes, null);
+            if (handle == Kernel32.INVALID_HANDLE_VALUE) {
+                throw new Win32Exception(Kernel32.INSTANCE.GetLastError());
+            }
+            return handle.getPointer().getLong(0);
+        }
+    }
+
+    private static void waitForInterruptibly(WinNT.HANDLE handle) {
+        int result = Kernel32.INSTANCE.WaitForMultipleObjects(1, new WinNT.HANDLE[]{handle}, false, Kernel32.INFINITE);
+        if (result == Kernel32.WAIT_FAILED) {
+            throw new Win32Exception(Kernel32.INSTANCE.GetLastError());
+        }
+    }
+
+    private static void waitForTimeoutInterruptibly(WinNT.HANDLE handle, long timeout) {
+        int result = Kernel32.INSTANCE.WaitForMultipleObjects(1, new WinNT.HANDLE[]{handle}, false, (int) timeout);
+        if (result == Kernel32.WAIT_FAILED) {
+            throw new Win32Exception(Kernel32.INSTANCE.GetLastError());
+        }
+    }
+
+}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java
similarity index 64%
copy from dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
copy to dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java
index 77fc398..3280a96 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/ConstantsTest.java
@@ -14,20 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.utils;
+package org.apache.dolphinscheduler.common;
 
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.junit.Assert;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-public class ProcessUtilsTest {
-    private static final Logger logger = LoggerFactory.getLogger(ProcessUtilsTest.class);
+/**
+ * Constants Test
+ */
+public class ConstantsTest {
 
+    /**
+     * Test PID via env
+     */
     @Test
-    public void getPidsStr() throws Exception {
-        String pidList = ProcessUtils.getPidsStr(1);
-        Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, "");
-        logger.info("Sub process list : {}", pidList);
+    public void testPID() {
+        if (OSUtils.isWindows()) {
+            Assert.assertEquals(Constants.PID, "handle");
+        } else {
+            Assert.assertEquals(Constants.PID, "pid");
+        }
     }
+
 }
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
index 7106804..b955787 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/OSUtilsTest.java
@@ -39,16 +39,20 @@ public class OSUtilsTest {
 
     @Test
     public void testOSMetric(){
-        double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
-        Assert.assertTrue(availablePhysicalMemorySize > 0.0f);
-        double totalMemorySize = OSUtils.totalMemorySize();
-        Assert.assertTrue(totalMemorySize > 0.0f);
-        double loadAverage = OSUtils.loadAverage();
-        logger.info("loadAverage {}", loadAverage);
-        double memoryUsage = OSUtils.memoryUsage();
-        Assert.assertTrue(memoryUsage > 0.0f);
-        double cpuUsage = OSUtils.cpuUsage();
-        Assert.assertTrue(cpuUsage > 0.0f);
+        if (!OSUtils.isWindows()) {
+            double availablePhysicalMemorySize = OSUtils.availablePhysicalMemorySize();
+            Assert.assertTrue(availablePhysicalMemorySize > 0.0f);
+            double totalMemorySize = OSUtils.totalMemorySize();
+            Assert.assertTrue(totalMemorySize > 0.0f);
+            double loadAverage = OSUtils.loadAverage();
+            logger.info("loadAverage {}", loadAverage);
+            double memoryUsage = OSUtils.memoryUsage();
+            Assert.assertTrue(memoryUsage > 0.0f);
+            double cpuUsage = OSUtils.cpuUsage();
+            Assert.assertTrue(cpuUsage > 0.0f);
+        } else {
+            // TODO window ut
+        }
     }
 
     @Test
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java
new file mode 100644
index 0000000..ce04346
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessBuilderForWin32Test.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils.process;
+
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OSUtils.class)
+public class ProcessBuilderForWin32Test {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class);
+
+    @Before
+    public void before() {
+        PowerMockito.mockStatic(OSUtils.class);
+        PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
+    }
+
+    @Test
+    public void testCreateProcessBuilderForWin32() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            Assert.assertNotNull(builder);
+
+            builder = new ProcessBuilderForWin32("net");
+            Assert.assertNotNull(builder);
+
+            builder = new ProcessBuilderForWin32(Collections.singletonList("net"));
+            Assert.assertNotNull(builder);
+
+            builder = new ProcessBuilderForWin32((List<String>) null);
+            Assert.assertNotNull(builder);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testBuildUser() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            builder.user("test", StringUtils.EMPTY);
+            Assert.assertNotNull(builder);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testBuildCommand() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            builder.command(Collections.singletonList("net"));
+            Assert.assertNotEquals(0, builder.command().size());
+
+            builder = new ProcessBuilderForWin32();
+            builder.command("net");
+            Assert.assertNotEquals(0, builder.command().size());
+
+            builder = new ProcessBuilderForWin32();
+            builder.command((List<String>) null);
+            Assert.assertNotEquals(0, builder.command().size());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testEnvironment() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            Assert.assertNotNull(builder.environment());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            builder.environment(new String[]{ "a=123" });
+            Assert.assertNotEquals(0, builder.environment().size());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDirectory() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            builder.directory(new File("/tmp"));
+            Assert.assertNotNull(builder.directory());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testStream() {
+        try {
+            InputStream in = ProcessBuilderForWin32.NullInputStream.INSTANCE;
+            Assert.assertNotNull(in);
+            Assert.assertEquals(-1, in.read());
+            Assert.assertEquals(0, in.available());
+
+            OutputStream out = ProcessBuilderForWin32.NullOutputStream.INSTANCE;
+            Assert.assertNotNull(out);
+            out.write(new byte[] {1});
+        } catch (Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRedirect() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+
+            builder.redirectInput(new File("/tmp"));
+            Assert.assertNotNull(builder.redirectInput());
+            Assert.assertNotNull(builder.redirectInput().file());
+
+            builder.redirectOutput(new File("/tmp"));
+            Assert.assertNotNull(builder.redirectOutput());
+            Assert.assertNotNull(builder.redirectOutput().file());
+
+            builder.redirectError(new File("/tmp"));
+            Assert.assertNotNull(builder.redirectError());
+            Assert.assertNotNull(builder.redirectError().file());
+
+            builder.redirectInput(builder.redirectOutput());
+            builder.redirectOutput(builder.redirectInput());
+            builder.redirectError(builder.redirectInput());
+
+            Assert.assertNotNull(ProcessBuilderForWin32.Redirect.PIPE.type());
+            Assert.assertNotNull(ProcessBuilderForWin32.Redirect.PIPE.toString());
+            Assert.assertNotNull(ProcessBuilderForWin32.Redirect.INHERIT.type());
+            Assert.assertNotNull(ProcessBuilderForWin32.Redirect.INHERIT.toString());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testRedirectErrorStream() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            builder.redirectErrorStream(true);
+            Assert.assertTrue(builder.redirectErrorStream());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void runCmdViaUser() {
+        try {
+            ProcessBuilderForWin32 builder = new ProcessBuilderForWin32();
+            builder.user("test123", StringUtils.EMPTY);
+
+            List<String> commands = new ArrayList<>();
+            commands.add("cmd.exe");
+            commands.add("/c");
+            commands.add("net user");
+            builder.command(commands);
+
+            Process process = builder.start();
+            BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("GBK")));
+            String line;
+            StringBuilder sb = new StringBuilder();
+            while ((line = inReader.readLine()) != null) {
+                sb.append(line);
+            }
+            logger.info("net user: {}", sb.toString());
+            Assert.assertNotEquals(StringUtils.EMPTY, sb.toString());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java
new file mode 100644
index 0000000..00c54c0
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessEnvironmentForWin32Test.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils.process;
+
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({OSUtils.class, ProcessEnvironmentForWin32.class})
+public class ProcessEnvironmentForWin32Test {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class);
+
+    @Before
+    public void before() {
+        try {
+            PowerMockito.mockStatic(OSUtils.class);
+            PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testPutAndGet() {
+        try {
+            ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
+            processEnvironmentForWin32.put("a", "123");
+            Assert.assertEquals("123", processEnvironmentForWin32.get("a"));
+            Assert.assertTrue(processEnvironmentForWin32.containsKey("a"));
+            Assert.assertTrue(processEnvironmentForWin32.containsValue("123"));
+            Assert.assertEquals("123", processEnvironmentForWin32.remove("a"));
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+
+        try {
+            ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
+            processEnvironmentForWin32.put("b=", "123");
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+
+        try {
+            ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
+            processEnvironmentForWin32.put("b", "\u0000");
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+
+        try {
+            ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
+            processEnvironmentForWin32.get(null);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testEntrySet() {
+        try {
+            ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
+            processEnvironmentForWin32.clear();
+            processEnvironmentForWin32.put("a", "123");
+            Assert.assertEquals(0, processEnvironmentForWin32.entrySet().size());
+            Assert.assertTrue(processEnvironmentForWin32.entrySet().isEmpty());
+            for (Map.Entry<String, String> entry : processEnvironmentForWin32.entrySet()) {
+                Assert.assertNotNull(entry);
+                Assert.assertNotNull(entry.getKey());
+                Assert.assertNotNull(entry.getValue());
+                Assert.assertNotNull(entry.setValue("123"));
+            }
+
+            processEnvironmentForWin32.clear();
+            Set<String> keys = processEnvironmentForWin32.keySet();
+            Assert.assertEquals(0, keys.size());
+            Assert.assertTrue(keys.isEmpty());
+
+            processEnvironmentForWin32.clear();
+            Collection<String> values = processEnvironmentForWin32.values();
+            Assert.assertEquals(0, keys.size());
+            Assert.assertTrue(keys.isEmpty());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testToEnvironmentBlock() {
+        try {
+            ProcessEnvironmentForWin32 processEnvironmentForWin32 = (ProcessEnvironmentForWin32) ProcessEnvironmentForWin32.emptyEnvironment(0);
+            Assert.assertNotNull(processEnvironmentForWin32.toEnvironmentBlock());
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+}
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java
new file mode 100644
index 0000000..3f8bcbf
--- /dev/null
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/process/ProcessImplForWin32Test.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.common.utils.process;
+
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.security.action.GetPropertyAction;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({OSUtils.class, GetPropertyAction.class})
+public class ProcessImplForWin32Test {
+
+    private static final Logger logger = LoggerFactory.getLogger(ProcessBuilderForWin32Test.class);
+
+    @Before
+    public void before() {
+        PowerMockito.mockStatic(OSUtils.class);
+        PowerMockito.mockStatic(GetPropertyAction.class);
+        PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
+    }
+
+    @Test
+    public void testStart() {
+        try {
+            Process process = ProcessImplForWin32.start(
+                    "test123", StringUtils.EMPTY, new String[]{"net"},
+                    null, null, null, false);
+            Assert.assertNotNull(process);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+
+        try {
+            Process process = ProcessImplForWin32.start(
+                    "test123", StringUtils.EMPTY, new String[]{"net"},
+                    null, null, new ProcessBuilderForWin32.Redirect[]{
+                            ProcessBuilderForWin32.Redirect.PIPE,
+                            ProcessBuilderForWin32.Redirect.PIPE,
+                            ProcessBuilderForWin32.Redirect.PIPE
+                    }, false);
+            Assert.assertNotNull(process);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index bac498c..8e0ccee 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -16,21 +16,29 @@
  */
 package org.apache.dolphinscheduler.server.worker.task;
 
+import com.sun.jna.platform.win32.Kernel32;
+import com.sun.jna.platform.win32.WinNT;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.HadoopUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.process.ProcessBuilderForWin32;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.server.utils.ProcessUtils;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.slf4j.Logger;
 
 import java.io.*;
 import java.lang.reflect.Field;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -194,26 +202,49 @@ public abstract class AbstractCommandExecutor {
      * @throws IOException IO Exception
      */
     private void buildProcess(String commandFile) throws IOException {
+        // command list
+        List<String> command = new ArrayList<>();
+
         //init process builder
-        ProcessBuilder processBuilder = new ProcessBuilder();
-        // setting up a working directory
-        processBuilder.directory(new File(taskDir));
-        // merge error information to standard output stream
-        processBuilder.redirectErrorStream(true);
-        // setting up user to run commands
-        List<String> command = new LinkedList<>();
-        command.add("sudo");
-        command.add("-u");
-        command.add(tenantCode);
-        command.add(commandInterpreter());
-        command.addAll(commandOptions());
-        command.add(commandFile);
-        processBuilder.command(command);
-
-        process = processBuilder.start();
+        if (OSUtils.isWindows()) {
+            ProcessBuilderForWin32 processBuilder = new ProcessBuilderForWin32();
+            // setting up a working directory
+            processBuilder.directory(new File(taskDir));
+            processBuilder.user(tenantCode, StringUtils.EMPTY);
+            // merge error information to standard output stream
+            processBuilder.redirectErrorStream(true);
+
+            // setting up user to run commands
+            command.add(commandInterpreter());
+            command.add("/c");
+            command.addAll(commandOptions());
+            command.add(commandFile);
+
+            // setting commands
+            processBuilder.command(command);
+            process = processBuilder.start();
+        } else {
+            ProcessBuilder processBuilder = new ProcessBuilder();
+            // setting up a working directory
+            processBuilder.directory(new File(taskDir));
+            // merge error information to standard output stream
+            processBuilder.redirectErrorStream(true);
+
+            // setting up user to run commands
+            command.add("sudo");
+            command.add("-u");
+            command.add(tenantCode);
+            command.add(commandInterpreter());
+            command.addAll(commandOptions());
+            command.add(commandFile);
+
+            // setting commands
+            processBuilder.command(command);
+            process = processBuilder.start();
+        }
 
         // print command
-        printCommand(processBuilder);
+        printCommand(command);
     }
 
     /**
@@ -320,13 +351,13 @@ public abstract class AbstractCommandExecutor {
 
     /**
      * print command
-     * @param processBuilder process builder
+     * @param command command
      */
-    private void printCommand(ProcessBuilder processBuilder) {
+    private void printCommand(List<String> command) {
         String cmdStr;
 
         try {
-            cmdStr = ProcessUtils.buildCommandStr(processBuilder.command());
+            cmdStr = ProcessUtils.buildCommandStr(command);
             logger.info("task run command:\n{}", cmdStr);
         } catch (IOException e) {
             logger.error(e.getMessage(), e);
@@ -358,7 +389,11 @@ public abstract class AbstractCommandExecutor {
                 BufferedReader inReader = null;
 
                 try {
-                    inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                    if (OSUtils.isWindows()) {
+                        inReader = new BufferedReader(new InputStreamReader(process.getInputStream(), Charset.forName("GBK")));
+                    } else {
+                        inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                    }
                     String line;
 
                     long lastFlushTime = System.currentTimeMillis();
@@ -406,7 +441,7 @@ public abstract class AbstractCommandExecutor {
                     }
                     Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                 }
-           }
+            }
         } catch (Exception e) {
             logger.error("yarn applications: {}  status failed ", appIds,e);
             result = false;
@@ -510,12 +545,15 @@ public abstract class AbstractCommandExecutor {
      */
     private int getProcessId(Process process) {
         int processId = 0;
-
         try {
             Field f = process.getClass().getDeclaredField(Constants.PID);
             f.setAccessible(true);
-
-            processId = f.getInt(process);
+            if (OSUtils.isWindows()) {
+                WinNT.HANDLE handle = (WinNT.HANDLE) f.get(process);
+                processId = Kernel32.INSTANCE.GetProcessId(handle);
+            } else {
+                processId = f.getInt(process);
+            }
         } catch (Throwable e) {
             logger.error(e.getMessage(), e);
         }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index db46d0d..5d14e6b 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -17,11 +17,12 @@
 package org.apache.dolphinscheduler.server.worker.task;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Date;
@@ -34,11 +35,16 @@ import java.util.function.Consumer;
 public class ShellCommandExecutor extends AbstractCommandExecutor {
 
     /**
-     * sh
+     * For Unix-like, using sh
      */
     public static final String SH = "sh";
 
     /**
+     * For Windows, using cmd.exe
+     */
+    public static final String CMD = "cmd.exe";
+
+    /**
      * constructor
      * @param logHandler    log handler
      * @param taskDir       task dir
@@ -66,7 +72,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
     @Override
     protected String buildCommandFilePath() {
         // command file
-        return String.format("%s/%s.command", taskDir, taskAppId);
+        return String.format("%s/%s.%s", taskDir, taskAppId, OSUtils.isWindows() ? "bat" : "command");
     }
 
     /**
@@ -75,7 +81,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
      */
     @Override
     protected String commandInterpreter() {
-        return SH;
+        return OSUtils.isWindows() ? CMD : SH;
     }
 
     /**
@@ -103,21 +109,26 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
             logger.info("create command file:{}", commandFile);
 
             StringBuilder sb = new StringBuilder();
-            sb.append("#!/bin/sh\n");
-            sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
-            sb.append("cd $BASEDIR\n");
-
-            if (envFile != null) {
-                sb.append("source " + envFile + "\n");
+            if (OSUtils.isWindows()) {
+                sb.append("@echo off\n");
+                sb.append("cd /d %~dp0\n");
+                if (envFile != null) {
+                    sb.append("call ").append(envFile).append("\n");
+                }
+            } else {
+                sb.append("#!/bin/sh\n");
+                sb.append("BASEDIR=$(cd `dirname $0`; pwd)\n");
+                sb.append("cd $BASEDIR\n");
+                if (envFile != null) {
+                    sb.append("source ").append(envFile).append("\n");
+                }
             }
 
-            sb.append("\n\n");
             sb.append(execCommand);
-            logger.info("command : {}",sb.toString());
+            logger.info("command : {}", sb.toString());
 
             // write data to file
-            FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
-                    Charset.forName("UTF-8"));
+            FileUtils.writeStringToFile(new File(commandFile), sb.toString(), StandardCharsets.UTF_8);
         }
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
index ef941cd..7537ca2 100755
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/datax/DataxTask.java
@@ -44,6 +44,7 @@ import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.datax.DataxParameters;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
 import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
@@ -338,7 +339,7 @@ public class DataxTask extends AbstractTask {
     private String buildShellCommandFile(String jobConfigFilePath)
         throws Exception {
         // generate scripts
-        String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId());
+        String fileName = String.format("%s/%s_node.%s", taskDir, taskProps.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
         Path path = new File(fileName).toPath();
 
         if (Files.exists(path)) {
@@ -370,7 +371,13 @@ public class DataxTask extends AbstractTask {
         // create shell command file
         Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
         FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
-        Files.createFile(path, attr);
+
+        if (OSUtils.isWindows()) {
+            Files.createFile(path);
+        } else {
+            Files.createFile(path, attr);
+        }
+
         Files.write(path, dataxCommand.getBytes(), StandardOpenOption.APPEND);
 
         return fileName;
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index 5704c80..90661a6 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.server.utils.ParamUtils;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@@ -123,7 +124,7 @@ public class ShellTask extends AbstractTask {
    */
   private String buildCommand() throws Exception {
     // generate scripts
-    String fileName = String.format("%s/%s_node.sh", taskDir, taskProps.getTaskAppId());
+    String fileName = String.format("%s/%s_node.%s", taskDir, taskProps.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
     Path path = new File(fileName).toPath();
 
     if (Files.exists(path)) {
@@ -154,7 +155,11 @@ public class ShellTask extends AbstractTask {
     Set<PosixFilePermission> perms = PosixFilePermissions.fromString(Constants.RWXR_XR_X);
     FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms);
 
-    Files.createFile(path, attr);
+    if (OSUtils.isWindows()) {
+      Files.createFile(path);
+    } else {
+      Files.createFile(path, attr);
+    }
 
     Files.write(path, shellParameters.getRawScript().getBytes(), StandardOpenOption.APPEND);
 
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
index 77fc398..1e0adaa 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ProcessUtilsTest.java
@@ -21,7 +21,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 public class ProcessUtilsTest {
+
     private static final Logger logger = LoggerFactory.getLogger(ProcessUtilsTest.class);
 
     @Test
@@ -30,4 +35,16 @@ public class ProcessUtilsTest {
         Assert.assertNotEquals("The child process of process 1 should not be empty", pidList, "");
         logger.info("Sub process list : {}", pidList);
     }
+
+    @Test
+    public void testBuildCommandStr() {
+        List<String> commands = new ArrayList<>();
+        commands.add("sudo");
+        try {
+            Assert.assertEquals(ProcessUtils.buildCommandStr(commands), "sudo");
+        } catch (IOException e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
 }
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
new file mode 100644
index 0000000..5536665
--- /dev/null
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTaskTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dolphinscheduler.server.worker.task.shell;
+
+import org.apache.dolphinscheduler.common.enums.CommandType;
+import org.apache.dolphinscheduler.common.enums.DbType;
+import org.apache.dolphinscheduler.common.utils.OSUtils;
+import org.apache.dolphinscheduler.dao.entity.DataSource;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
+import org.apache.dolphinscheduler.server.worker.task.TaskProps;
+import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
+import org.apache.dolphinscheduler.service.process.ProcessService;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.ApplicationContext;
+
+import java.util.Date;
+
+/**
+ *  shell task test
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(OSUtils.class)
+public class ShellTaskTest {
+
+    private static final Logger logger = LoggerFactory.getLogger(ShellTaskTest.class);
+
+    private ShellTask shellTask;
+
+    private ProcessService processService;
+
+    private ShellCommandExecutor shellCommandExecutor;
+
+    private ApplicationContext applicationContext;
+
+    @Before
+    public void before() throws Exception {
+        PowerMockito.mockStatic(OSUtils.class);
+        processService = PowerMockito.mock(ProcessService.class);
+        shellCommandExecutor = PowerMockito.mock(ShellCommandExecutor.class);
+
+        applicationContext = PowerMockito.mock(ApplicationContext.class);
+        SpringApplicationContext springApplicationContext = new SpringApplicationContext();
+        springApplicationContext.setApplicationContext(applicationContext);
+        PowerMockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
+
+        TaskProps props = new TaskProps();
+        props.setTaskDir("/tmp");
+        props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
+        props.setTaskInstId(1);
+        props.setTenantCode("1");
+        props.setEnvFile(".dolphinscheduler_env.sh");
+        props.setTaskStartTime(new Date());
+        props.setTaskTimeout(0);
+        props.setTaskParams("{\"rawScript\": \" echo 'hello world!'\"}");
+        shellTask = new ShellTask(props, logger);
+        shellTask.init();
+
+        PowerMockito.when(processService.findDataSourceById(1)).thenReturn(getDataSource());
+        PowerMockito.when(processService.findDataSourceById(2)).thenReturn(getDataSource());
+        PowerMockito.when(processService.findProcessInstanceByTaskId(1)).thenReturn(getProcessInstance());
+
+        String fileName = String.format("%s/%s_node.%s", props.getTaskDir(), props.getTaskAppId(), OSUtils.isWindows() ? "bat" : "sh");
+        PowerMockito.when(shellCommandExecutor.run(fileName, processService)).thenReturn(0);
+    }
+
+    private DataSource getDataSource() {
+        DataSource dataSource = new DataSource();
+        dataSource.setType(DbType.MYSQL);
+        dataSource.setConnectionParams(
+                "{\"user\":\"root\",\"password\":\"123456\",\"address\":\"jdbc:mysql://127.0.0.1:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://127.0.0.1:3306/test\"}");
+        dataSource.setUserId(1);
+        return dataSource;
+    }
+
+    private ProcessInstance getProcessInstance() {
+        ProcessInstance processInstance = new ProcessInstance();
+        processInstance.setCommandType(CommandType.START_PROCESS);
+        processInstance.setScheduleTime(new Date());
+        return processInstance;
+    }
+
+    @After
+    public void after() {}
+
+    /**
+     * Method: ShellTask()
+     */
+    @Test
+    public void testShellTask()
+            throws Exception {
+        TaskProps props = new TaskProps();
+        props.setTaskDir("/tmp");
+        props.setTaskAppId(String.valueOf(System.currentTimeMillis()));
+        props.setTaskInstId(1);
+        props.setTenantCode("1");
+        ShellTask shellTaskTest = new ShellTask(props, logger);
+        Assert.assertNotNull(shellTaskTest);
+    }
+
+    /**
+     * Method: init for Unix-like
+     */
+    @Test
+    public void testInitForUnix() {
+        try {
+            PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
+            shellTask.init();
+            Assert.assertTrue(true);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    /**
+     * Method: init for Windows
+     */
+    @Test
+    public void testInitForWindows() {
+        try {
+            PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
+            shellTask.init();
+            Assert.assertTrue(true);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+    /**
+     * Method: handle() for Unix-like
+     */
+    @Test
+    public void testHandleForUnix() throws Exception {
+        try {
+            PowerMockito.when(OSUtils.isWindows()).thenReturn(false);
+            shellTask.handle();
+            Assert.assertTrue(true);
+        } catch (Error | Exception e) {
+            if (!e.getMessage().contains("process error . exitCode is :  -1")
+                    && !System.getProperty("os.name").startsWith("Windows")) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Method: handle() for Windows
+     */
+    @Test
+    public void testHandleForWindows() throws Exception {
+        try {
+            PowerMockito.when(OSUtils.isWindows()).thenReturn(true);
+            shellTask.handle();
+            Assert.assertTrue(true);
+        } catch (Error | Exception e) {
+            if (!e.getMessage().contains("process error . exitCode is :  -1")) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * Method: cancelApplication()
+     */
+    @Test
+    public void testCancelApplication() throws Exception {
+        try {
+            shellTask.cancelApplication(true);
+            Assert.assertTrue(true);
+        } catch (Error | Exception e) {
+            logger.error(e.getMessage());
+        }
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index dee1dce..a7feec0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -684,6 +684,9 @@
                 <configuration>
                     <includes>
                         <include>**/common/utils/*.java</include>
+                        <include>**/common/utils/process/ProcessBuilderForWin32Test.java</include>
+                        <include>**/common/utils/process/ProcessEnvironmentForWin32Test.java</include>
+                        <include>**/common/utils/process/ProcessImplForWin32Test.java</include>
                         <include>**/common/log/*.java</include>
                         <include>**/common/threadutils/*.java</include>
                         <include>**/common/graph/*.java</include>
@@ -732,6 +735,7 @@
                         <include>**/alert/template/AlertTemplateFactoryTest.java</include>
                         <include>**/alert/template/impl/DefaultHTMLTemplateTest.java</include>
                         <include>**/server/worker/task/datax/DataxTaskTest.java</include>
+                        <include>**/server/worker/task/shell/ShellTaskTest.java</include>
                         <include>**/server/worker/task/sqoop/SqoopTaskTest.java</include>
                         <include>**/server/utils/DataxUtilsTest.java</include>
                         <include>**/service/zk/DefaultEnsembleProviderTest.java</include>