You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/06/19 09:44:57 UTC

[incubator-doris] branch master updated: [Spark Load]Fe submit spark etl job (#3716)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 532d15d  [Spark Load]Fe submit spark etl job (#3716)
532d15d is described below

commit 532d15d3810fbfc4feb1663945a805979ae805eb
Author: wyb <wy...@gmail.com>
AuthorDate: Fri Jun 19 17:44:47 2020 +0800

    [Spark Load]Fe submit spark etl job (#3716)
    
    After user creates a spark load job which status is PENDING, Fe will schedule and submit the spark etl job.
    1. Begin transaction
    2. Create a SparkLoadPendingTask for submitting etl job
    2.1 Create etl job configuration according to https://github.com/apache/incubator-doris/issues/3010#issuecomment-635174675
    2.2 Upload the configuration file and job jar to HDFS with broker
    2.3 Submit etl job to spark cluster
    2.4 Wait for etl job submission result
    3. Update job state to ETL and log job update info if etl job is submitted successfully
    
    #3433
---
 .../java/org/apache/doris/catalog/OlapTable.java   |   4 +
 fe/src/main/java/org/apache/doris/common/Pair.java |   6 +
 .../org/apache/doris/common/util/BrokerUtil.java   | 412 +++++++++++++--
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 fe/src/main/java/org/apache/doris/load/Load.java   | 128 +++--
 .../doris/load/loadv2/BrokerLoadPendingTask.java   |   2 +-
 .../java/org/apache/doris/load/loadv2/LoadJob.java |  90 +++-
 .../apache/doris/load/loadv2/LoadJobScheduler.java |   9 +
 .../org/apache/doris/load/loadv2/LoadManager.java  |  11 +
 .../org/apache/doris/load/loadv2/LoadTask.java     |  10 +-
 .../doris/load/loadv2/SparkEtlJobHandler.java      | 173 +++++++
 .../org/apache/doris/load/loadv2/SparkLoadJob.java | 178 ++++++-
 .../doris/load/loadv2/SparkLoadPendingTask.java    | 550 +++++++++++++++++++++
 .../load/loadv2/SparkPendingTaskAttachment.java    |  62 +++
 .../java/org/apache/doris/persist/EditLog.java     |  10 +
 .../org/apache/doris/persist/OperationType.java    |   2 +-
 .../org/apache/doris/persist/gson/GsonUtils.java   |  11 +-
 .../org/apache/doris/planner/BrokerScanNode.java   |   2 +-
 .../apache/doris/common/util/BrokerUtilTest.java   | 204 +++++++-
 .../load/loadv2/BrokerLoadPendingTaskTest.java     |   2 +-
 .../org/apache/doris/load/loadv2/LoadJobTest.java  |   7 +-
 .../load/loadv2/SparkLoadPendingTaskTest.java      | 326 ++++++++++++
 22 files changed, 2107 insertions(+), 98 deletions(-)

diff --git a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
index b4d60f6..c0ac047 100644
--- a/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -537,6 +537,10 @@ public class OlapTable extends Table {
         return keysType;
     }
 
+    public KeysType getKeysTypeByIndexId(long indexId) {
+        return indexIdToMeta.get(indexId).getKeysType();
+    }
+
     public PartitionInfo getPartitionInfo() {
         return partitionInfo;
     }
diff --git a/fe/src/main/java/org/apache/doris/common/Pair.java b/fe/src/main/java/org/apache/doris/common/Pair.java
index 14daca0..1a520e0 100644
--- a/fe/src/main/java/org/apache/doris/common/Pair.java
+++ b/fe/src/main/java/org/apache/doris/common/Pair.java
@@ -17,15 +17,21 @@
 
 package org.apache.doris.common;
 
+import com.google.gson.annotations.SerializedName;
+
 import java.util.Comparator;
 
 /**
  * The equivalent of C++'s std::pair<>.
+ *
+ * Notice: When using Pair for persistence, users need to guarantee that F and S can be serialized through Gson
  */
 public class Pair<F, S> {
     public static PairComparator<Pair<?, Comparable>> PAIR_VALUE_COMPARATOR = new PairComparator<>();
 
+    @SerializedName(value = "first")
     public F first;
+    @SerializedName(value = "second")
     public S second;
 
     public Pair(F first, S second) {
diff --git a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 622b871..ebbf82b 100644
--- a/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -17,52 +17,65 @@
 
 package org.apache.doris.common.util;
 
-import com.google.common.collect.Lists;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.UserException;
 import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TBrokerCloseReaderRequest;
+import org.apache.doris.thrift.TBrokerCloseWriterRequest;
+import org.apache.doris.thrift.TBrokerDeletePathRequest;
+import org.apache.doris.thrift.TBrokerFD;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TBrokerListPathRequest;
 import org.apache.doris.thrift.TBrokerListResponse;
+import org.apache.doris.thrift.TBrokerOpenMode;
+import org.apache.doris.thrift.TBrokerOpenReaderRequest;
+import org.apache.doris.thrift.TBrokerOpenReaderResponse;
+import org.apache.doris.thrift.TBrokerOpenWriterRequest;
+import org.apache.doris.thrift.TBrokerOpenWriterResponse;
+import org.apache.doris.thrift.TBrokerOperationStatus;
 import org.apache.doris.thrift.TBrokerOperationStatusCode;
+import org.apache.doris.thrift.TBrokerPReadRequest;
+import org.apache.doris.thrift.TBrokerPWriteRequest;
+import org.apache.doris.thrift.TBrokerReadResponse;
 import org.apache.doris.thrift.TBrokerVersion;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloBrokerService;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.Collections;
 import java.util.List;
 
 public class BrokerUtil {
     private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
 
-    public static void parseBrokerFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses)
+    private static int READ_BUFFER_SIZE_B = 1024 * 1024;
+
+    /**
+     * Parse file status in path with broker, except directory
+     * @param path
+     * @param brokerDesc
+     * @param fileStatuses: file path, size, isDir, isSplitable
+     * @throws UserException if broker op failed
+     */
+    public static void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses)
             throws UserException {
-        FsBroker broker = null;
-        try {
-            String localIP = FrontendOptions.getLocalHostAddress();
-            broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerDesc.getName(), localIP);
-        } catch (AnalysisException e) {
-            throw new UserException(e.getMessage());
-        }
-        TNetworkAddress address = new TNetworkAddress(broker.ip, broker.port);
-        TPaloBrokerService.Client client = null;
-        try {
-            client = ClientPool.brokerPool.borrowObject(address);
-        } catch (Exception e) {
-            try {
-                client = ClientPool.brokerPool.borrowObject(address);
-            } catch (Exception e1) {
-                throw new UserException("Create connection to broker(" + address + ") failed.");
-            }
-        }
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
         boolean failed = true;
         try {
             TBrokerListPathRequest request = new TBrokerListPathRequest(
@@ -71,11 +84,11 @@ public class BrokerUtil {
             try {
                 tBrokerListResponse = client.listPath(request);
             } catch (TException e) {
-                ClientPool.brokerPool.reopen(client);
+                reopenClient(client);
                 tBrokerListResponse = client.listPath(request);
             }
             if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
-                throw new UserException("Broker list path failed.path=" + path
+                throw new UserException("Broker list path failed. path=" + path
                         + ",broker=" + address + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
             }
             failed = false;
@@ -87,13 +100,9 @@ public class BrokerUtil {
             }
         } catch (TException e) {
             LOG.warn("Broker list path exception, path={}, address={}, exception={}", path, address, e);
-            throw new UserException("Broker list path exception.path=" + path + ",broker=" + address);
+            throw new UserException("Broker list path exception. path=" + path + ", broker=" + address);
         } finally {
-            if (failed) {
-                ClientPool.brokerPool.invalidateObject(address, client);
-            } else {
-                ClientPool.brokerPool.returnObject(address, client);
-            }
+            returnClient(client, address, failed);
         }
     }
 
@@ -139,4 +148,351 @@ public class BrokerUtil {
         return Lists.newArrayList(columns);
     }
 
+    /**
+     * Read binary data from path with broker
+     * @param path
+     * @param brokerDesc
+     * @return byte[]
+     * @throws UserException if broker op failed or not only one file
+     */
+    public static byte[] readFile(String path, BrokerDesc brokerDesc) throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        TBrokerFD fd = null;
+        try {
+            // get file size
+            TBrokerListPathRequest request = new TBrokerListPathRequest(
+                    TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
+            TBrokerListResponse tBrokerListResponse = null;
+            try {
+                tBrokerListResponse = client.listPath(request);
+            } catch (TException e) {
+                reopenClient(client);
+                tBrokerListResponse = client.listPath(request);
+            }
+            if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker list path failed. path=" + path + ", broker=" + address
+                                                + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
+            }
+            List<TBrokerFileStatus> fileStatuses = tBrokerListResponse.getFiles();
+            if (fileStatuses.size() != 1) {
+                throw new UserException("Broker files num error. path=" + path + ", broker=" + address
+                                                + ", files num: " + fileStatuses.size());
+            }
+
+            Preconditions.checkState(!fileStatuses.get(0).isIsDir());
+            long fileSize = fileStatuses.get(0).getSize();
+
+            // open reader
+            String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port;
+            TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
+                    TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties());
+            TBrokerOpenReaderResponse tOpenReaderResponse = null;
+            try {
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            }
+            if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker open reader failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tOpenReaderResponse.getOpStatus().getMessage());
+            }
+            fd = tOpenReaderResponse.getFd();
+
+            // read
+            TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+            TBrokerReadResponse tReadResponse = null;
+            try {
+                tReadResponse = client.pread(tPReadRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tReadResponse = client.pread(tPReadRequest);
+            }
+            if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker read failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tReadResponse.getOpStatus().getMessage());
+            }
+            failed = false;
+            return tReadResponse.getData();
+        } catch (TException e) {
+            String failMsg = "Broker read file exception. path=" + path + ", broker=" + address;
+            LOG.warn(failMsg, e);
+            throw new UserException(failMsg);
+        } finally {
+            // close reader
+            if (fd != null) {
+                failed = true;
+                TBrokerCloseReaderRequest tCloseReaderRequest = new TBrokerCloseReaderRequest(
+                        TBrokerVersion.VERSION_ONE, fd);
+                TBrokerOperationStatus tOperationStatus = null;
+                try {
+                    tOperationStatus = client.closeReader(tCloseReaderRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    try {
+                        tOperationStatus = client.closeReader(tCloseReaderRequest);
+                    } catch (TException ex) {
+                        LOG.warn("Broker close reader failed. path={}, address={}", path, address, ex);
+                    }
+                }
+                if (tOperationStatus == null || tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    LOG.warn("Broker close reader failed. path={}, address={}, error={}", path, address,
+                             tOperationStatus.getMessage());
+                } else {
+                    failed = false;
+                }
+            }
+
+            // return client
+            returnClient(client, address, failed);
+        }
+    }
+
+    /**
+     * Write binary data to destFilePath with broker
+     * @param data
+     * @param destFilePath
+     * @param brokerDesc
+     * @throws UserException if broker op failed
+     */
+    public static void writeFile(byte[] data, String destFilePath, BrokerDesc brokerDesc) throws UserException {
+        BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc);
+        try {
+            writer.open();
+            ByteBuffer byteBuffer = ByteBuffer.wrap(data);
+            writer.write(byteBuffer, data.length);
+        } finally {
+            writer.close();
+        }
+    }
+
+    /**
+     * Write srcFilePath file to destFilePath with broker
+     * @param srcFilePath
+     * @param destFilePath
+     * @param brokerDesc
+     * @throws UserException if broker op failed
+     */
+    public static void writeFile(String srcFilePath, String destFilePath,
+                                 BrokerDesc brokerDesc) throws UserException {
+        FileInputStream fis = null;
+        FileChannel channel = null;
+        BrokerWriter writer = new BrokerWriter(destFilePath, brokerDesc);
+        ByteBuffer byteBuffer = ByteBuffer.allocate(READ_BUFFER_SIZE_B);
+        try {
+            writer.open();
+            fis = new FileInputStream(srcFilePath);
+            channel = fis.getChannel();
+            while (true) {
+                int readSize = channel.read(byteBuffer);
+                if (readSize == -1) {
+                    break;
+                }
+
+                byteBuffer.flip();
+                writer.write(byteBuffer, readSize);
+                byteBuffer.clear();
+            }
+        } catch (IOException e) {
+            String failMsg = "Read file exception. filePath=" + srcFilePath;
+            LOG.warn(failMsg, e);
+            throw new UserException(failMsg);
+        } finally {
+            // close broker file writer and local file input stream
+            writer.close();
+            try {
+                if (channel != null) {
+                    channel.close();
+                }
+                if (fis != null) {
+                    fis.close();
+                }
+            } catch (IOException e) {
+                LOG.warn("Close local file failed. srcPath={}", srcFilePath, e);
+            }
+        }
+    }
+
+    /**
+     * Delete path with broker
+     * @param path
+     * @param brokerDesc
+     * @throws UserException if broker op failed
+     */
+    public static void deletePath(String path, BrokerDesc brokerDesc) throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        try {
+            TBrokerDeletePathRequest tDeletePathRequest = new TBrokerDeletePathRequest(
+                    TBrokerVersion.VERSION_ONE, path, brokerDesc.getProperties());
+            TBrokerOperationStatus tOperationStatus = null;
+            try {
+                tOperationStatus = client.deletePath(tDeletePathRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOperationStatus = client.deletePath(tDeletePathRequest);
+            }
+            if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker delete path failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tOperationStatus.getMessage());
+            }
+            failed = false;
+        } catch (TException e) {
+            LOG.warn("Broker read path exception, path={}, address={}, exception={}", path, address, e);
+            throw new UserException("Broker read path exception. path=" + path + ",broker=" + address);
+        } finally {
+            returnClient(client, address, failed);
+        }
+    }
+
+    private static TNetworkAddress getAddress(BrokerDesc brokerDesc) throws UserException {
+        FsBroker broker = null;
+        try {
+            String localIP = FrontendOptions.getLocalHostAddress();
+            broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(brokerDesc.getName(), localIP);
+        } catch (AnalysisException e) {
+            throw new UserException(e.getMessage());
+        }
+        return new TNetworkAddress(broker.ip, broker.port);
+    }
+
+    private static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
+        TPaloBrokerService.Client client = null;
+        try {
+            client = ClientPool.brokerPool.borrowObject(address);
+        } catch (Exception e) {
+            try {
+                client = ClientPool.brokerPool.borrowObject(address);
+            } catch (Exception e1) {
+                throw new UserException("Create connection to broker(" + address + ") failed.");
+            }
+        }
+        return client;
+    }
+
+    private static void returnClient(TPaloBrokerService.Client client, TNetworkAddress address, boolean failed) {
+        if (failed) {
+            ClientPool.brokerPool.invalidateObject(address, client);
+        } else {
+            ClientPool.brokerPool.returnObject(address, client);
+        }
+    }
+
+    private static void reopenClient(TPaloBrokerService.Client client) {
+        ClientPool.brokerPool.reopen(client);
+    }
+
+    private static class BrokerWriter {
+        private String brokerFilePath;
+        private BrokerDesc brokerDesc;
+        private TPaloBrokerService.Client client;
+        private TNetworkAddress address;
+        private TBrokerFD fd;
+        private long currentOffset;
+        private boolean isReady;
+        private boolean failed;
+
+        public BrokerWriter(String brokerFilePath, BrokerDesc brokerDesc) {
+            this.brokerFilePath = brokerFilePath;
+            this.brokerDesc = brokerDesc;
+            this.isReady = false;
+            this.failed = true;
+        }
+
+        public void open() throws UserException {
+            failed = true;
+            address = BrokerUtil.getAddress(brokerDesc);
+            client = BrokerUtil.borrowClient(address);
+            try {
+                String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port;
+                TBrokerOpenWriterRequest tOpenWriterRequest = new TBrokerOpenWriterRequest(
+                        TBrokerVersion.VERSION_ONE, brokerFilePath, TBrokerOpenMode.APPEND,
+                        clientId, brokerDesc.getProperties());
+                TBrokerOpenWriterResponse tOpenWriterResponse = null;
+                try {
+                    tOpenWriterResponse = client.openWriter(tOpenWriterRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    tOpenWriterResponse = client.openWriter(tOpenWriterRequest);
+                }
+                if (tOpenWriterResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    throw new UserException("Broker open writer failed. destPath=" + brokerFilePath
+                                                    + ", broker=" + address
+                                                    + ", msg=" + tOpenWriterResponse.getOpStatus().getMessage());
+                }
+                failed = false;
+                fd = tOpenWriterResponse.getFd();
+                currentOffset = 0L;
+                isReady = true;
+            } catch (TException e) {
+                String failMsg = "Broker open writer exception. filePath=" + brokerFilePath + ", broker=" + address;
+                LOG.warn(failMsg, e);
+                throw new UserException(failMsg);
+            }
+        }
+
+        public void write(ByteBuffer byteBuffer, long bufferSize) throws UserException {
+            if (!isReady) {
+                throw new UserException("Broker writer is not ready. filePath=" + brokerFilePath + ", broker=" + address);
+            }
+
+            failed = true;
+            TBrokerOperationStatus tOperationStatus = null;
+            TBrokerPWriteRequest tPWriteRequest = new TBrokerPWriteRequest(
+                    TBrokerVersion.VERSION_ONE, fd, currentOffset, byteBuffer);
+            try {
+                try {
+                    tOperationStatus = client.pwrite(tPWriteRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    tOperationStatus = client.pwrite(tPWriteRequest);
+                }
+                if (tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    throw new UserException("Broker write failed. filePath=" + brokerFilePath + ", broker=" + address
+                                                    + ", msg=" + tOperationStatus.getMessage());
+                }
+                failed = false;
+                currentOffset += bufferSize;
+            } catch (TException e) {
+                String failMsg = "Broker write exception. filePath=" + brokerFilePath + ", broker=" + address;
+                LOG.warn(failMsg, e);
+                throw new UserException(failMsg);
+            }
+        }
+
+        public void close() {
+            // close broker writer
+            failed = true;
+            TBrokerOperationStatus tOperationStatus = null;
+            if (fd != null) {
+                TBrokerCloseWriterRequest tCloseWriterRequest = new TBrokerCloseWriterRequest(
+                        TBrokerVersion.VERSION_ONE, fd);
+                try {
+                    tOperationStatus = client.closeWriter(tCloseWriterRequest);
+                } catch (TException e) {
+                    reopenClient(client);
+                    try {
+                        tOperationStatus = client.closeWriter(tCloseWriterRequest);
+                    } catch (TException ex) {
+                        LOG.warn("Broker close writer failed. filePath={}, address={}", brokerFilePath, address, ex);
+                    }
+                }
+                if (tOperationStatus == null || tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+                    LOG.warn("Broker close writer failed. filePath={}, address={}, error={}", brokerFilePath,
+                             address, tOperationStatus.getMessage());
+                } else {
+                    failed = false;
+                }
+            }
+
+            // return client
+            returnClient(client, address, failed);
+            isReady = false;
+        }
+
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
index 69971c1..b707a15 100644
--- a/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -42,6 +42,7 @@ import org.apache.doris.load.ExportJob;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.loadv2.LoadJobFinalOperation;
+import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.master.Checkpoint;
 import org.apache.doris.mysql.privilege.UserPropertyInfo;
@@ -499,6 +500,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_UPDATE_LOAD_JOB: {
+                data = LoadJobStateUpdateInfo.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_CREATE_RESOURCE: {
                 data = Resource.read(in);
                 isRead = true;
diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java
index 197b809..38fa0d7 100644
--- a/fe/src/main/java/org/apache/doris/load/Load.java
+++ b/fe/src/main/java/org/apache/doris/load/Load.java
@@ -864,6 +864,78 @@ public class Load {
         }
     }
 
+    /**
+     * When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
+     * their names. These columns are invisible to user, but we need to generate data for these columns.
+     * So we add column mappings for these column.
+     * eg1:
+     * base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B'
+     * So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
+     */
+    public static List<ImportColumnDesc> getSchemaChangeShadowColumnDesc(Table tbl, Map<String, Expr> columnExprMap) {
+        List<ImportColumnDesc> shadowColumnDescs = Lists.newArrayList();
+        for (Column column : tbl.getFullSchema()) {
+            if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
+                continue;
+            }
+
+            String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX);
+            if (columnExprMap.containsKey(originCol)) {
+                Expr mappingExpr = columnExprMap.get(originCol);
+                if (mappingExpr != null) {
+                    /*
+                     * eg:
+                     * (A, C) SET (B = func(xx))
+                     * ->
+                     * (A, C) SET (B = func(xx), __doris_shadow_B = func(xx))
+                     */
+                    ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr);
+                    shadowColumnDescs.add(importColumnDesc);
+                } else {
+                    /*
+                     * eg:
+                     * (A, B, C)
+                     * ->
+                     * (A, B, C) SET (__doris_shadow_B = B)
+                     */
+                    ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(),
+                                                                             new SlotRef(null, originCol));
+                    shadowColumnDescs.add(importColumnDesc);
+                }
+            } else {
+                /*
+                 * There is a case that if user does not specify the related origin column, eg:
+                 * COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'.
+                 * We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B.
+                 * In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping
+                 */
+                // do nothing
+            }
+        }
+        return shadowColumnDescs;
+    }
+
+    /*
+     * used for spark load job
+     * not init slot desc and analyze exprs
+     */
+    public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
+                                   Map<String, Pair<String, List<String>>> columnToHadoopFunction) throws UserException {
+        initColumns(tbl, columnExprs, columnToHadoopFunction, null, null, null, null, null, false);
+    }
+
+    /*
+     * This function should be used for broker load v2 and stream load.
+     * And it must be called in same db lock when planing.
+     */
+    public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
+                                   Map<String, Pair<String, List<String>>> columnToHadoopFunction,
+                                   Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
+                                   Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
+        initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer,
+                    srcTupleDesc, slotDescByName, params, true);
+    }
+
     /*
      * This function will do followings:
      * 1. fill the column exprs if user does not specify any column or column mapping.
@@ -871,14 +943,12 @@ public class Load {
      * 3. Add any shadow columns if have.
      * 4. validate hadoop functions
      * 5. init slot descs and expr map for load plan
-     * 
-     * This function should be used for broker load v2 and stream load.
-     * And it must be called in same db lock when planing.
      */
     public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
             Map<String, Pair<String, List<String>>> columnToHadoopFunction,
             Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
-            Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
+            Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params,
+            boolean needInitSlotAndAnalyzeExprs) throws UserException {
         // check mapping column exist in schema
         // !! all column mappings are in columnExprs !!
         for (ImportColumnDesc importColumnDesc : columnExprs) {
@@ -925,50 +995,8 @@ public class Load {
             throw new DdlException("Column has no default value. column: " + columnName);
         }
 
-        // When doing schema change, there may have some 'shadow' columns, with prefix '__doris_shadow_' in
-        // their names. These columns are invisible to user, but we need to generate data for these columns.
-        // So we add column mappings for these column.
-        // eg1:
-        // base schema is (A, B, C), and B is under schema change, so there will be a shadow column: '__doris_shadow_B'
-        // So the final column mapping should looks like: (A, B, C, __doris_shadow_B = substitute(B));
-        for (Column column : tbl.getFullSchema()) {
-            if (!column.isNameWithPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX)) {
-                continue;
-            }
-
-            String originCol = column.getNameWithoutPrefix(SchemaChangeHandler.SHADOW_NAME_PRFIX);
-            if (columnExprMap.containsKey(originCol)) {
-                Expr mappingExpr = columnExprMap.get(originCol);
-                if (mappingExpr != null) {
-                    /*
-                     * eg:
-                     * (A, C) SET (B = func(xx)) 
-                     * ->
-                     * (A, C) SET (B = func(xx), __doris_shadow_B = func(xx))
-                     */
-                    ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), mappingExpr);
-                    copiedColumnExprs.add(importColumnDesc);
-                } else {
-                    /*
-                     * eg:
-                     * (A, B, C)
-                     * ->
-                     * (A, B, C) SET (__doris_shadow_B = B)
-                     */
-                    ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(),
-                            new SlotRef(null, originCol));
-                    copiedColumnExprs.add(importColumnDesc);
-                }
-            } else {
-                /*
-                 * There is a case that if user does not specify the related origin column, eg:
-                 * COLUMNS (A, C), and B is not specified, but B is being modified so there is a shadow column '__doris_shadow_B'.
-                 * We can not just add a mapping function "__doris_shadow_B = substitute(B)", because Doris can not find column B.
-                 * In this case, __doris_shadow_B can use its default value, so no need to add it to column mapping
-                 */
-                // do nothing
-            }
-        }
+        // get shadow column desc when table schema change
+        copiedColumnExprs.addAll(getSchemaChangeShadowColumnDesc(tbl, columnExprMap));
 
         // validate hadoop functions
         if (columnToHadoopFunction != null) {
@@ -991,6 +1019,10 @@ public class Load {
             }
         }
 
+        if (!needInitSlotAndAnalyzeExprs) {
+            return;
+        }
+
         // init slot desc add expr map, also transform hadoop functions
         for (ImportColumnDesc importColumnDesc : copiedColumnExprs) {
             // make column name case match with real column name
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
index 099caea..3d39460 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java
@@ -75,7 +75,7 @@ public class BrokerLoadPendingTask extends LoadTask {
                 long groupFileSize = 0;
                 List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
                 for (String path : fileGroup.getFilePaths()) {
-                    BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
+                    BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
                 }
                 fileStatusList.add(fileStatuses);
                 for (TBrokerFileStatus fstatus : fileStatuses) {
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
index 3b0bb52..28b556c 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import com.google.gson.annotations.SerializedName;
 import org.apache.doris.analysis.LoadStmt;
 import org.apache.doris.catalog.AuthorizationInfo;
 import org.apache.doris.catalog.Catalog;
@@ -30,6 +31,7 @@ import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.Text;
@@ -46,6 +48,7 @@ import org.apache.doris.load.Load;
 import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.mysql.privilege.PaloPrivilege;
 import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.Coordinator;
 import org.apache.doris.qe.QeProcessorImpl;
@@ -392,7 +395,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
      * @throws AnalysisException there are error params in job
      * @throws DuplicatedRequestException 
      */
-    public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
+    public void execute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException,
+            DuplicatedRequestException, LoadException {
         writeLock();
         try {
             unprotectedExecute();
@@ -401,8 +405,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         }
     }
 
-    public void unprotectedExecute()
-            throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
+    public void unprotectedExecute() throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException,
+            DuplicatedRequestException, LoadException {
         // check if job state is pending
         if (state != JobState.PENDING) {
             return;
@@ -410,7 +414,10 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         // the limit of job will be restrict when begin txn
         beginTxn();
         unprotectedExecuteJob();
-        unprotectedUpdateState(JobState.LOADING);
+        // update spark load job state from PENDING to ETL when pending task is finished
+        if (jobType != EtlJobType.SPARK) {
+            unprotectedUpdateState(JobState.LOADING);
+        }
     }
 
     public void processTimeout() {
@@ -433,7 +440,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         }
     }
 
-    protected void unprotectedExecuteJob() {
+    protected void unprotectedExecuteJob() throws LoadException {
     }
 
     /**
@@ -706,6 +713,9 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
                 case CANCELLED:
                     jobInfo.add("ETL:N/A; LOAD:N/A");
                     break;
+                case ETL:
+                    jobInfo.add("ETL:" + progress + "%; LOAD:0%");
+                    break;
                 default:
                     jobInfo.add("ETL:100%; LOAD:" + progress + "%");
                     break;
@@ -722,7 +732,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             }
 
             // task info
-            jobInfo.add("cluster:N/A" + "; timeout(s):" + timeoutSecond
+            jobInfo.add("cluster:" + getResourceName() + "; timeout(s):" + timeoutSecond
                                 + "; max_filter_ratio:" + maxFilterRatio);
 
             // error msg
@@ -735,7 +745,7 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             // create time
             jobInfo.add(TimeUtils.longToTimeString(createTimestamp));
             // etl start time
-            jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp));
+            jobInfo.add(TimeUtils.longToTimeString(getEtlStartTimestamp()));
             // etl end time
             jobInfo.add(TimeUtils.longToTimeString(loadStartTimestamp));
             // load start time
@@ -751,6 +761,14 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         }
     }
 
+    protected String getResourceName() {
+        return "N/A";
+    }
+
+    protected long getEtlStartTimestamp() {
+        return loadStartTimestamp;
+    }
+
     public void getJobInfo(Load.JobInfo jobInfo) throws DdlException {
         checkAuth("SHOW LOAD");
         jobInfo.tblNames.addAll(getTableNamesForShow());
@@ -768,6 +786,8 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
         EtlJobType type = EtlJobType.valueOf(Text.readString(in));
         if (type == EtlJobType.BROKER) {
             job = new BrokerLoadJob();
+        } else if (type == EtlJobType.SPARK) {
+            job = new SparkLoadJob();
         } else if (type == EtlJobType.INSERT) {
             job = new InsertLoadJob();
         } else if (type == EtlJobType.MINI) {
@@ -1016,4 +1036,60 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
             timezone = Text.readString(in);
         }
     }
+
+    public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
+        state = info.getState();
+        transactionId = info.getTransactionId();
+        loadStartTimestamp = info.getLoadStartTimestamp();
+    }
+
+    public static class LoadJobStateUpdateInfo implements Writable {
+        @SerializedName(value = "jobId")
+        private long jobId;
+        @SerializedName(value = "state")
+        private JobState state;
+        @SerializedName(value = "transactionId")
+        private long transactionId;
+        @SerializedName(value = "loadStartTimestamp")
+        private long loadStartTimestamp;
+
+        public LoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long loadStartTimestamp) {
+            this.jobId = jobId;
+            this.state = state;
+            this.transactionId = transactionId;
+            this.loadStartTimestamp = loadStartTimestamp;
+        }
+
+        public long getJobId() {
+            return jobId;
+        }
+
+        public JobState getState() {
+            return state;
+        }
+
+        public long getTransactionId() {
+            return transactionId;
+        }
+
+        public long getLoadStartTimestamp() {
+            return loadStartTimestamp;
+        }
+
+        @Override
+        public String toString() {
+            return GsonUtils.GSON.toJson(this);
+        }
+
+        @Override
+        public void write(DataOutput out) throws IOException {
+            String json = GsonUtils.GSON.toJson(this);
+            Text.writeString(out, json);
+        }
+
+        public static LoadJobStateUpdateInfo read(DataInput in) throws IOException {
+            String json = Text.readString(in);
+            return GsonUtils.GSON.fromJson(json, LoadJobStateUpdateInfo.class);
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
index f01e99b..9a2b691 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJobScheduler.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
 import org.apache.doris.common.util.MasterDaemon;
@@ -74,8 +75,16 @@ public class LoadJobScheduler extends MasterDaemon {
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
                                  .add("error_msg", "There are error properties in job. Job will be cancelled")
                                  .build(), e);
+                // transaction not begin, so need not abort
                 loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
                         false, true);
+            } catch (LoadException e) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
+                                 .add("error_msg", "Failed to submit etl job. Job will be cancelled")
+                                 .build(), e);
+                // transaction already begin, so need abort
+                loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
+                                              true, true);
             } catch (DuplicatedRequestException e) {
                 // should not happen in load job scheduler, there is no request id.
                 LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index 1a5658a..5dd2d05 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -333,6 +333,17 @@ public class LoadManager implements Writable{
                          .build());
     }
 
+    public void replayUpdateLoadJobStateInfo(LoadJob.LoadJobStateUpdateInfo info) {
+        long jobId = info.getJobId();
+        LoadJob job = idToLoadJob.get(jobId);
+        if (job == null) {
+            LOG.warn("replay update load job state failed. error: job not found, id: {}", jobId);
+            return;
+        }
+
+        job.replayUpdateStateInfo(info);
+    }
+
     public int getLoadJobNum(JobState jobState, long dbId) {
         readLock();
         try {
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
index e9b286f..0a52369 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadTask.java
@@ -18,6 +18,7 @@
 package org.apache.doris.load.loadv2;
 
 import org.apache.doris.catalog.Catalog;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.LogBuilder;
 import org.apache.doris.common.util.LogKey;
@@ -53,7 +54,7 @@ public abstract class LoadTask extends MasterTask {
         } catch (UserException e) {
             failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
             LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
-                    .add("error_msg", "Failed to execute load task").build());
+                    .add("error_msg", "Failed to execute load task").build(), e);
         } catch (Exception e) {
             failMsg.setMsg(e.getMessage() == null ? "" : e.getMessage());
             LOG.warn(new LogBuilder(LogKey.LOAD_JOB, callback.getCallbackId())
@@ -67,6 +68,13 @@ public abstract class LoadTask extends MasterTask {
     }
 
     /**
+     * init load task
+     * @throws LoadException
+     */
+    public void init() throws LoadException {
+    }
+
+    /**
      * execute load task
      *
      * @throws UserException task is failed
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
new file mode 100644
index 0000000..d135b23
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.PaloFe;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.thrift.TEtlState;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkAppHandle;
+import org.apache.spark.launcher.SparkAppHandle.Listener;
+import org.apache.spark.launcher.SparkAppHandle.State;
+import org.apache.spark.launcher.SparkLauncher;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+/**
+ * SparkEtlJobHandler is responsible for
+ * 1. submit spark etl job
+ * 2. get spark etl job status
+ * 3. kill spark etl job
+ * 4. get spark etl file paths
+ * 5. delete etl output path
+ */
+public class SparkEtlJobHandler {
+    private static final Logger LOG = LogManager.getLogger(SparkEtlJobHandler.class);
+
+    private static final String APP_RESOURCE_NAME = "palo-fe.jar";
+    private static final String CONFIG_FILE_NAME = "jobconfig.json";
+    private static final String APP_RESOURCE_LOCAL_PATH = PaloFe.DORIS_HOME_DIR + "/lib/" + APP_RESOURCE_NAME;
+    private static final String JOB_CONFIG_DIR = "configs";
+    private static final String MAIN_CLASS = "org.apache.doris.load.loadv2.etl.SparkEtlJob";
+    private static final String ETL_JOB_NAME = "doris__%s";
+    // 5min
+    private static final int GET_APPID_MAX_RETRY_TIMES = 300;
+    private static final int GET_APPID_SLEEP_MS = 1000;
+
+    class SparkAppListener implements Listener {
+        @Override
+        public void stateChanged(SparkAppHandle sparkAppHandle) {}
+
+        @Override
+        public void infoChanged(SparkAppHandle sparkAppHandle) {}
+    }
+
+    public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig, SparkResource resource,
+                             BrokerDesc brokerDesc, SparkPendingTaskAttachment attachment) throws LoadException {
+        // delete outputPath
+        deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);
+
+        // upload app resource and jobconfig to hdfs
+        String configsHdfsDir = etlJobConfig.outputPath + "/" + JOB_CONFIG_DIR + "/";
+        String appResourceHdfsPath = configsHdfsDir + APP_RESOURCE_NAME;
+        String jobConfigHdfsPath = configsHdfsDir + CONFIG_FILE_NAME;
+        try {
+            BrokerUtil.writeFile(APP_RESOURCE_LOCAL_PATH, appResourceHdfsPath, brokerDesc);
+            byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8");
+            BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc);
+        } catch (UserException | UnsupportedEncodingException e) {
+            throw new LoadException(e.getMessage());
+        }
+
+        SparkLauncher launcher = new SparkLauncher();
+        // master      |  deployMode
+        // ------------|-------------
+        // yarn        |  cluster
+        // spark://xx  |  client
+        launcher.setMaster(resource.getMaster())
+                .setDeployMode(resource.getDeployMode().name().toLowerCase())
+                .setAppResource(appResourceHdfsPath)
+				// TODO(wyb): spark-load
+                // replace with getCanonicalName later
+                //.setMainClass(SparkEtlJob.class.getCanonicalName())
+                .setMainClass(MAIN_CLASS)
+                .setAppName(String.format(ETL_JOB_NAME, loadLabel))
+                .addAppArgs(jobConfigHdfsPath);
+        // spark configs
+        for (Map.Entry<String, String> entry : resource.getSparkConfigs().entrySet()) {
+            launcher.setConf(entry.getKey(), entry.getValue());
+        }
+
+        // start app
+        SparkAppHandle handle = null;
+        State state = null;
+        String appId = null;
+        int retry = 0;
+        String errMsg = "start spark app failed. error: ";
+        try {
+            handle = launcher.startApplication(new SparkAppListener());
+        } catch (IOException e) {
+            LOG.warn(errMsg, e);
+            throw new LoadException(errMsg + e.getMessage());
+        }
+
+        while (retry++ < GET_APPID_MAX_RETRY_TIMES) {
+            appId = handle.getAppId();
+            if (appId != null) {
+                break;
+            }
+
+            // check state and retry
+            state = handle.getState();
+            if (fromSparkState(state) == TEtlState.CANCELLED) {
+                throw new LoadException(errMsg + "spark app state: " + state.toString());
+            }
+            if (retry >= GET_APPID_MAX_RETRY_TIMES) {
+                throw new LoadException(errMsg + "wait too much time for getting appid. spark app state: "
+                                                + state.toString());
+            }
+
+            // log
+            if (retry % 10 == 0) {
+                LOG.info("spark appid that handle get is null. load job id: {}, state: {}, retry times: {}",
+                         loadJobId, state.toString(), retry);
+            }
+            try {
+                Thread.sleep(GET_APPID_SLEEP_MS);
+            } catch (InterruptedException e) {
+                LOG.warn(e.getMessage());
+            }
+        }
+
+        // success
+        attachment.setAppId(appId);
+        attachment.setHandle(handle);
+    }
+
+    public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
+        try {
+            BrokerUtil.deletePath(outputPath, brokerDesc);
+            LOG.info("delete path success. path: {}", outputPath);
+        } catch (UserException e) {
+            LOG.warn("delete path failed. path: {}", outputPath, e);
+        }
+    }
+
+    private TEtlState fromSparkState(State state) {
+        switch (state) {
+            case FINISHED:
+                return TEtlState.FINISHED;
+            case FAILED:
+            case KILLED:
+            case LOST:
+                return TEtlState.CANCELLED;
+            default:
+                // UNKNOWN CONNECTED SUBMITTED RUNNING
+                return TEtlState.RUNNING;
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 8bb68f7..8f74235 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -17,30 +17,43 @@
 
 package org.apache.doris.load.loadv2;
 
-import com.google.common.base.Strings;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.ResourceDesc;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.DuplicatedRequestException;
+import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
 import org.apache.doris.load.EtlJobType;
 import org.apache.doris.load.FailMsg;
 import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.PushTask;
+import org.apache.doris.transaction.BeginTransactionException;
+import org.apache.doris.transaction.TransactionState.LoadJobSourceType;
+import org.apache.doris.transaction.TransactionState.TxnCoordinator;
+import org.apache.doris.transaction.TransactionState.TxnSourceType;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.spark.launcher.SparkAppHandle;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -79,7 +92,7 @@ public class SparkLoadJob extends BulkLoadJob {
     private long quorumFinishTimestamp = -1;
     // below for push task
     private Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();
-    //private Map<Long, PushBrokerScannerParams> indexToPushBrokerReaderParams = Maps.newHashMap();
+    //private Map<Long, PushBrokerReaderParams> indexToPushBrokerReaderParams = Maps.newHashMap();
     private Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();
     private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = Maps.newHashMap();
     private Set<Long> finishedReplicas = Sets.newHashSet();
@@ -127,6 +140,77 @@ public class SparkLoadJob extends BulkLoadJob {
         brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties);
     }
 
+    @Override
+    public void beginTxn()
+            throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
+       transactionId = Catalog.getCurrentGlobalTransactionMgr()
+                .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
+                                  new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
+                                  LoadJobSourceType.FRONTEND, id, timeoutSecond);
+    }
+
+    @Override
+    protected void unprotectedExecuteJob() throws LoadException {
+        // create pending task
+        LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(),
+                                                 sparkResource, brokerDesc);
+        task.init();
+        idToTasks.put(task.getSignature(), task);
+        Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
+    }
+
+    @Override
+    public void onTaskFinished(TaskAttachment attachment) {
+        if (attachment instanceof SparkPendingTaskAttachment) {
+            onPendingTaskFinished((SparkPendingTaskAttachment) attachment);
+        }
+    }
+
+    private void onPendingTaskFinished(SparkPendingTaskAttachment attachment) {
+        writeLock();
+        try {
+            // check if job has been cancelled
+            if (isTxnDone()) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                                 .add("state", state)
+                                 .add("error_msg", "this task will be ignored when job is: " + state)
+                                 .build());
+                return;
+            }
+
+            if (finishedTaskIds.contains(attachment.getTaskId())) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                                 .add("task_id", attachment.getTaskId())
+                                 .add("error_msg", "this is a duplicated callback of pending task "
+                                         + "when broker already has loading task")
+                                 .build());
+                return;
+            }
+
+            // add task id into finishedTaskIds
+            finishedTaskIds.add(attachment.getTaskId());
+
+            sparkAppHandle = attachment.getHandle();
+            appId = attachment.getAppId();
+            etlOutputPath = attachment.getOutputPath();
+
+            executeEtl();
+            // log etl state
+            unprotectedLogUpdateStateInfo();
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * update etl start time and state in spark load job
+     */
+    private void executeEtl() {
+        etlStartTimestamp = System.currentTimeMillis();
+        state = JobState.ETL;
+        LOG.info("update to {} state success. job id: {}", state, id);
+    }
+
     /**
      * load job already cancelled or finished, clear job below:
      * 1. kill etl job and delete etl files
@@ -136,8 +220,7 @@ public class SparkLoadJob extends BulkLoadJob {
         Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED);
 
         LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state);
-        // TODO(wyb): spark-load
-        //SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
         if (state == JobState.CANCELLED) {
             if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) {
                 try {
@@ -152,8 +235,7 @@ public class SparkLoadJob extends BulkLoadJob {
             try {
                 // delete label dir, remove the last taskId dir
                 String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/"));
-                // TODO(wyb): spark-load
-                //handler.deleteEtlOutputPath(outputPath, brokerDesc);
+                handler.deleteEtlOutputPath(outputPath, brokerDesc);
             } catch (Exception e) {
                 LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e);
             }
@@ -199,6 +281,16 @@ public class SparkLoadJob extends BulkLoadJob {
     }
 
     @Override
+    protected String getResourceName() {
+        return sparkResource.getName();
+    }
+
+    @Override
+    protected long getEtlStartTimestamp() {
+        return etlStartTimestamp;
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
         super.write(out);
         sparkResource.write(out);
@@ -226,4 +318,78 @@ public class SparkLoadJob extends BulkLoadJob {
             tabletMetaToFileInfo.put(tabletMetaStr, fileInfo);
         }
     }
+
+    /**
+     * log load job update info when job state changed to etl or loading
+     */
+    private void unprotectedLogUpdateStateInfo() {
+        SparkLoadJobStateUpdateInfo info = new SparkLoadJobStateUpdateInfo(
+                id, state, transactionId, etlStartTimestamp, appId, etlOutputPath,
+                loadStartTimestamp, tabletMetaToFileInfo);
+        Catalog.getCurrentCatalog().getEditLog().logUpdateLoadJob(info);
+    }
+
+    @Override
+    public void replayUpdateStateInfo(LoadJobStateUpdateInfo info) {
+        super.replayUpdateStateInfo(info);
+        SparkLoadJobStateUpdateInfo sparkJobStateInfo = (SparkLoadJobStateUpdateInfo) info;
+        etlStartTimestamp = sparkJobStateInfo.getEtlStartTimestamp();
+        appId = sparkJobStateInfo.getAppId();
+        etlOutputPath = sparkJobStateInfo.getEtlOutputPath();
+        tabletMetaToFileInfo = sparkJobStateInfo.getTabletMetaToFileInfo();
+
+        switch (state) {
+            case ETL:
+                // nothing to do
+                break;
+            case LOADING:
+                // TODO(wyb): spark-load
+                //unprotectedPrepareLoadingInfos();
+                break;
+            default:
+                LOG.warn("replay update load job state info failed. error: wrong state. job id: {}, state: {}",
+                         id, state);
+                break;
+        }
+    }
+
+    /**
+     * Used for spark load job journal log when job state changed to ETL or LOADING
+     */
+    public static class SparkLoadJobStateUpdateInfo extends LoadJobStateUpdateInfo {
+        @SerializedName(value = "etlStartTimestamp")
+        private long etlStartTimestamp;
+        @SerializedName(value = "appId")
+        private String appId;
+        @SerializedName(value = "etlOutputPath")
+        private String etlOutputPath;
+        @SerializedName(value = "tabletMetaToFileInfo")
+        private Map<String, Pair<String, Long>> tabletMetaToFileInfo;
+
+        public SparkLoadJobStateUpdateInfo(long jobId, JobState state, long transactionId, long etlStartTimestamp,
+                                           String appId, String etlOutputPath, long loadStartTimestamp,
+                                           Map<String, Pair<String, Long>> tabletMetaToFileInfo) {
+            super(jobId, state, transactionId, loadStartTimestamp);
+            this.etlStartTimestamp = etlStartTimestamp;
+            this.appId = appId;
+            this.etlOutputPath = etlOutputPath;
+            this.tabletMetaToFileInfo = tabletMetaToFileInfo;
+        }
+
+        public long getEtlStartTimestamp() {
+            return etlStartTimestamp;
+        }
+
+        public String getAppId() {
+            return appId;
+        }
+
+        public String getEtlOutputPath() {
+            return etlOutputPath;
+        }
+
+        public Map<String, Pair<String, Long>> getTabletMetaToFileInfo() {
+            return tabletMetaToFileInfo;
+        }
+    }
 }
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
new file mode 100644
index 0000000..185e84a
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java
@@ -0,0 +1,550 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.FunctionCallExpr;
+import org.apache.doris.analysis.ImportColumnDesc;
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.HiveTable;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionKey;
+import org.apache.doris.catalog.PartitionType;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.load.Load;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumn;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlColumnMapping;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlJobProperty;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.FilePatternVersion;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.SourceType;
+import org.apache.doris.transaction.TransactionState;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// 1. create etl job config and write it into jobconfig.json file
+// 2. submit spark etl job
+public class SparkLoadPendingTask extends LoadTask {
+    private static final Logger LOG = LogManager.getLogger(SparkLoadPendingTask.class);
+
+    private final Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups;
+    private final SparkResource resource;
+    private final BrokerDesc brokerDesc;
+    private final long dbId;
+    private final String loadLabel;
+    private final long loadJobId;
+    private final long transactionId;
+    private EtlJobConfig etlJobConfig;
+
+    public SparkLoadPendingTask(SparkLoadJob loadTaskCallback,
+                                Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToBrokerFileGroups,
+                                SparkResource resource, BrokerDesc brokerDesc) {
+        super(loadTaskCallback);
+        this.retryTime = 3;
+        this.attachment = new SparkPendingTaskAttachment(signature);
+        this.aggKeyToBrokerFileGroups = aggKeyToBrokerFileGroups;
+        this.resource = resource;
+        this.brokerDesc = brokerDesc;
+        this.dbId = loadTaskCallback.getDbId();
+        this.loadJobId = loadTaskCallback.getId();
+        this.loadLabel = loadTaskCallback.getLabel();
+        this.transactionId = loadTaskCallback.getTransactionId();
+        this.failMsg = new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL);
+    }
+
+    @Override
+    void executeTask() throws LoadException {
+        LOG.info("begin to execute spark pending task. load job id: {}", loadJobId);
+        submitEtlJob();
+    }
+
+    private void submitEtlJob() throws LoadException {
+        SparkPendingTaskAttachment sparkAttachment = (SparkPendingTaskAttachment) attachment;
+        // retry different output path
+        etlJobConfig.outputPath = EtlJobConfig.getOutputPath(resource.getWorkingDir(), dbId, loadLabel, signature);
+        sparkAttachment.setOutputPath(etlJobConfig.outputPath);
+
+        // handler submit etl job
+        SparkEtlJobHandler handler = new SparkEtlJobHandler();
+        handler.submitEtlJob(loadJobId, loadLabel, etlJobConfig, resource, brokerDesc, sparkAttachment);
+        LOG.info("submit spark etl job success. load job id: {}, attachment: {}", loadJobId, sparkAttachment);
+    }
+
+    @Override
+    public void init() throws LoadException {
+        createEtlJobConf();
+    }
+
+    private void createEtlJobConf() throws LoadException {
+        Database db = Catalog.getCurrentCatalog().getDb(dbId);
+        if (db == null) {
+            throw new LoadException("db does not exist. id: " + dbId);
+        }
+
+        Map<Long, EtlTable> tables = Maps.newHashMap();
+        db.readLock();
+        try {
+            Map<Long, Set<Long>> tableIdToPartitionIds = Maps.newHashMap();
+            Set<Long> allPartitionsTableIds = Sets.newHashSet();
+            prepareTablePartitionInfos(db, tableIdToPartitionIds, allPartitionsTableIds);
+
+
+            for (Map.Entry<FileGroupAggKey, List<BrokerFileGroup>> entry : aggKeyToBrokerFileGroups.entrySet()) {
+                FileGroupAggKey aggKey = entry.getKey();
+                long tableId = aggKey.getTableId();
+
+                OlapTable table = (OlapTable) db.getTable(tableId);
+                if (table == null) {
+                    throw new LoadException("table does not exist. id: " + tableId);
+                }
+
+                EtlTable etlTable = null;
+                if (tables.containsKey(tableId)) {
+                    etlTable = tables.get(tableId);
+                } else {
+                    // indexes
+                    List<EtlIndex> etlIndexes = createEtlIndexes(table);
+                    // partition info
+                    EtlPartitionInfo etlPartitionInfo = createEtlPartitionInfo(table,
+                                                                               tableIdToPartitionIds.get(tableId));
+                    etlTable = new EtlTable(etlIndexes, etlPartitionInfo);
+                    tables.put(tableId, etlTable);
+
+                    // add table indexes to transaction state
+                    TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr()
+                            .getTransactionState(dbId, transactionId);
+                    if (txnState == null) {
+                        throw new LoadException("txn does not exist. id: " + transactionId);
+                    }
+                    txnState.addTableIndexes(table);
+                }
+
+                // file group
+                for (BrokerFileGroup fileGroup : entry.getValue()) {
+                    etlTable.addFileGroup(createEtlFileGroup(fileGroup, tableIdToPartitionIds.get(tableId), db, table));
+                }
+            }
+        } finally {
+            db.readUnlock();
+        }
+
+        String outputFilePattern = EtlJobConfig.getOutputFilePattern(loadLabel, FilePatternVersion.V1);
+        // strictMode timezone properties
+        EtlJobProperty properties = new EtlJobProperty();
+        properties.strictMode = ((LoadJob) callback).strictMode;
+        properties.timezone = ((LoadJob) callback).timezone;
+        etlJobConfig = new EtlJobConfig(tables, outputFilePattern, loadLabel, properties);
+    }
+
+    private void prepareTablePartitionInfos(Database db, Map<Long, Set<Long>> tableIdToPartitionIds,
+                                            Set<Long> allPartitionsTableIds) throws LoadException {
+        for (FileGroupAggKey aggKey : aggKeyToBrokerFileGroups.keySet()) {
+            long tableId = aggKey.getTableId();
+            if (allPartitionsTableIds.contains(tableId)) {
+                continue;
+            }
+
+            OlapTable table = (OlapTable) db.getTable(tableId);
+            if (table == null) {
+                throw new LoadException("table does not exist. id: " + tableId);
+            }
+
+            Set<Long> partitionIds = null;
+            if (tableIdToPartitionIds.containsKey(tableId)) {
+                partitionIds = tableIdToPartitionIds.get(tableId);
+            } else {
+                partitionIds = Sets.newHashSet();
+                tableIdToPartitionIds.put(tableId, partitionIds);
+            }
+
+            Set<Long> groupPartitionIds = aggKey.getPartitionIds();
+            // if not assign partition, use all partitions
+            if (groupPartitionIds == null || groupPartitionIds.isEmpty()) {
+                for (Partition partition : table.getPartitions()) {
+                    partitionIds.add(partition.getId());
+                }
+
+                allPartitionsTableIds.add(tableId);
+            } else {
+                partitionIds.addAll(groupPartitionIds);
+            }
+        }
+    }
+
+    private List<EtlIndex> createEtlIndexes(OlapTable table) throws LoadException {
+        List<EtlIndex> etlIndexes = Lists.newArrayList();
+
+        for (Map.Entry<Long, List<Column>> entry : table.getIndexIdToSchema().entrySet()) {
+            long indexId = entry.getKey();
+            int schemaHash = table.getSchemaHashByIndexId(indexId);
+
+            // columns
+            List<EtlColumn> etlColumns = Lists.newArrayList();
+            for (Column column : entry.getValue()) {
+                etlColumns.add(createEtlColumn(column));
+            }
+
+            // check distribution type
+            DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+            if (distributionInfo.getType() != DistributionInfoType.HASH) {
+                // RANDOM not supported
+                String errMsg = "Unsupported distribution type. type: " + distributionInfo.getType().name();
+                LOG.warn(errMsg);
+                throw new LoadException(errMsg);
+            }
+
+            // index type
+            String indexType = null;
+            KeysType keysType = table.getKeysTypeByIndexId(indexId);
+            switch (keysType) {
+                case DUP_KEYS:
+                    indexType = "DUPLICATE";
+                    break;
+                case AGG_KEYS:
+                    indexType = "AGGREGATE";
+                    break;
+                case UNIQUE_KEYS:
+                    indexType = "UNIQUE";
+                    break;
+                default:
+                    String errMsg = "unknown keys type. type: " + keysType.name();
+                    LOG.warn(errMsg);
+                    throw new LoadException(errMsg);
+            }
+
+            // is base index
+            boolean isBaseIndex = indexId == table.getBaseIndexId() ? true : false;
+
+            etlIndexes.add(new EtlIndex(indexId, etlColumns, schemaHash, indexType, isBaseIndex));
+        }
+
+        return etlIndexes;
+    }
+
+    private EtlColumn createEtlColumn(Column column) {
+        // column name
+        String name = column.getName();
+        // column type
+        PrimitiveType type = column.getDataType();
+        String columnType = column.getDataType().toString();
+        // is allow null
+        boolean isAllowNull = column.isAllowNull();
+        // is key
+        boolean isKey = column.isKey();
+
+        // aggregation type
+        String aggregationType = null;
+        if (column.getAggregationType() != null) {
+            aggregationType = column.getAggregationType().toString();
+        }
+
+        // default value
+        String defaultValue = null;
+        if (column.getDefaultValue() != null) {
+            defaultValue = column.getDefaultValue();
+        }
+        if (column.isAllowNull() && column.getDefaultValue() == null) {
+            defaultValue = "\\N";
+        }
+
+        // string length
+        int stringLength = 0;
+        if (type.isStringType()) {
+            stringLength = column.getStrLen();
+        }
+
+        // decimal precision scale
+        int precision = 0;
+        int scale = 0;
+        if (type.isDecimalType() || type.isDecimalV2Type()) {
+            precision = column.getPrecision();
+            scale = column.getScale();
+        }
+
+        return new EtlColumn(name, columnType, isAllowNull, isKey, aggregationType, defaultValue,
+                             stringLength, precision, scale);
+    }
+
+    private EtlPartitionInfo createEtlPartitionInfo(OlapTable table, Set<Long> partitionIds) throws LoadException {
+        PartitionType type = table.getPartitionInfo().getType();
+
+        List<String> partitionColumnRefs = Lists.newArrayList();
+        List<EtlPartition> etlPartitions = Lists.newArrayList();
+        if (type == PartitionType.RANGE) {
+            RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) table.getPartitionInfo();
+            for (Column column : rangePartitionInfo.getPartitionColumns()) {
+                partitionColumnRefs.add(column.getName());
+            }
+
+            for (Map.Entry<Long, Range<PartitionKey>> entry : rangePartitionInfo.getSortedRangeMap(false)) {
+                long partitionId = entry.getKey();
+                if (!partitionIds.contains(partitionId)) {
+                    continue;
+                }
+
+                Partition partition = table.getPartition(partitionId);
+                if (partition == null) {
+                    throw new LoadException("partition does not exist. id: " + partitionId);
+                }
+
+                // bucket num
+                int bucketNum = partition.getDistributionInfo().getBucketNum();
+
+                // is max partition
+                Range<PartitionKey> range = entry.getValue();
+                boolean isMaxPartition = range.upperEndpoint().isMaxValue();
+
+                // start keys
+                List<LiteralExpr> rangeKeyExprs = range.lowerEndpoint().getKeys();
+                List<Object> startKeys = Lists.newArrayList();
+                for (int i = 0; i < rangeKeyExprs.size(); ++i) {
+                    LiteralExpr literalExpr = rangeKeyExprs.get(i);
+                    Object keyValue = literalExpr.getRealValue();
+                    startKeys.add(keyValue);
+                }
+
+                // end keys
+                // is empty list when max partition
+                List<Object> endKeys = Lists.newArrayList();
+                if (!isMaxPartition) {
+                    rangeKeyExprs = range.upperEndpoint().getKeys();
+                    for (int i = 0; i < rangeKeyExprs.size(); ++i) {
+                        LiteralExpr literalExpr = rangeKeyExprs.get(i);
+                        Object keyValue = literalExpr.getRealValue();
+                        endKeys.add(keyValue);
+                    }
+                }
+
+                etlPartitions.add(new EtlPartition(partitionId, startKeys, endKeys, isMaxPartition, bucketNum));
+            }
+        } else {
+            Preconditions.checkState(type == PartitionType.UNPARTITIONED);
+            Preconditions.checkState(partitionIds.size() == 1);
+
+            for (Long partitionId : partitionIds) {
+                Partition partition = table.getPartition(partitionId);
+                if (partition == null) {
+                    throw new LoadException("partition does not exist. id: " + partitionId);
+                }
+
+                // bucket num
+                int bucketNum = partition.getDistributionInfo().getBucketNum();
+
+                etlPartitions.add(new EtlPartition(partitionId, Lists.newArrayList(), Lists.newArrayList(),
+                                                   true, bucketNum));
+            }
+        }
+
+        // distribution column refs
+        List<String> distributionColumnRefs = Lists.newArrayList();
+        DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
+        Preconditions.checkState(distributionInfo.getType() == DistributionInfoType.HASH);
+        for (Column column : ((HashDistributionInfo) distributionInfo).getDistributionColumns()) {
+            distributionColumnRefs.add(column.getName());
+        }
+
+        return new EtlPartitionInfo(type.typeString, partitionColumnRefs, distributionColumnRefs, etlPartitions);
+    }
+
+    private EtlFileGroup createEtlFileGroup(BrokerFileGroup fileGroup, Set<Long> tablePartitionIds,
+                                            Database db, OlapTable table) throws LoadException {
+        List<ImportColumnDesc> copiedColumnExprList = Lists.newArrayList(fileGroup.getColumnExprList());
+        Map<String, Expr> exprByName = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        for (ImportColumnDesc columnDesc : copiedColumnExprList) {
+            if (!columnDesc.isColumn()) {
+                exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr());
+            }
+        }
+
+        // check columns
+        try {
+            Load.initColumns(table, copiedColumnExprList, fileGroup.getColumnToHadoopFunction());
+        } catch (UserException e) {
+            throw new LoadException(e.getMessage());
+        }
+        // add shadow column mapping when schema change
+        for (ImportColumnDesc columnDesc : Load.getSchemaChangeShadowColumnDesc(table, exprByName)) {
+            copiedColumnExprList.add(columnDesc);
+            exprByName.put(columnDesc.getColumnName(), columnDesc.getExpr());
+        }
+
+        // check negative for sum aggregate type
+        if (fileGroup.isNegative()) {
+            for (Column column : table.getBaseSchema()) {
+                if (!column.isKey() && column.getAggregationType() != AggregateType.SUM) {
+                    throw new LoadException("Column is not SUM AggreateType. column:" + column.getName());
+                }
+            }
+        }
+
+        // fill file field names if empty
+        List<String> fileFieldNames = fileGroup.getFileFieldNames();
+        if (fileFieldNames == null || fileFieldNames.isEmpty()) {
+            fileFieldNames = Lists.newArrayList();
+            for (Column column : table.getBaseSchema()) {
+                fileFieldNames.add(column.getName());
+            }
+        }
+
+        // column mappings
+        Map<String, Pair<String, List<String>>> columnToHadoopFunction = fileGroup.getColumnToHadoopFunction();
+        Map<String, EtlColumnMapping> columnMappings = Maps.newHashMap();
+        if (columnToHadoopFunction != null) {
+            for (Map.Entry<String, Pair<String, List<String>>> entry : columnToHadoopFunction.entrySet()) {
+                columnMappings.put(entry.getKey(),
+                                   new EtlColumnMapping(entry.getValue().first, entry.getValue().second));
+            }
+        }
+        for (ImportColumnDesc columnDesc : copiedColumnExprList) {
+            if (columnDesc.isColumn() || columnMappings.containsKey(columnDesc.getColumnName())) {
+                continue;
+            }
+            // the left must be column expr
+            columnMappings.put(columnDesc.getColumnName(), new EtlColumnMapping(columnDesc.getExpr().toSql()));
+        }
+
+        // partition ids
+        List<Long> partitionIds = fileGroup.getPartitionIds();
+        if (partitionIds == null || partitionIds.isEmpty()) {
+            partitionIds = Lists.newArrayList(tablePartitionIds);
+        }
+
+        // where
+        // TODO: check
+        String where = "";
+        if (fileGroup.getWhereExpr() != null) {
+            where = fileGroup.getWhereExpr().toSql();
+        }
+
+        // load from table
+        String hiveDbTableName = "";
+        Map<String, String> hiveTableProperties = Maps.newHashMap();
+        if (fileGroup.isLoadFromTable()) {
+            long srcTableId = fileGroup.getSrcTableId();
+            HiveTable srcHiveTable = (HiveTable) db.getTable(srcTableId);
+            if (srcHiveTable == null) {
+                throw new LoadException("table does not exist. id: " + srcTableId);
+            }
+            hiveDbTableName = srcHiveTable.getHiveDbTable();
+            hiveTableProperties.putAll(srcHiveTable.getHiveProperties());
+        }
+
+        // check hll and bitmap func
+        // TODO: more check
+        for (Column column : table.getBaseSchema()) {
+            String columnName = column.getName();
+            PrimitiveType columnType = column.getDataType();
+            Expr expr = exprByName.get(columnName);
+            if (columnType == PrimitiveType.HLL) {
+                checkHllMapping(columnName, expr);
+            }
+            if (columnType == PrimitiveType.BITMAP) {
+                checkBitmapMapping(columnName, expr, fileGroup.isLoadFromTable());
+            }
+        }
+
+        EtlFileGroup etlFileGroup = null;
+        if (fileGroup.isLoadFromTable()) {
+            etlFileGroup = new EtlFileGroup(SourceType.HIVE, hiveDbTableName, hiveTableProperties,
+                                            fileGroup.isNegative(), columnMappings, where, partitionIds);
+        } else {
+            etlFileGroup = new EtlFileGroup(SourceType.FILE, fileGroup.getFilePaths(), fileFieldNames,
+                                            fileGroup.getColumnsFromPath(), fileGroup.getValueSeparator(),
+                                            fileGroup.getLineDelimiter(), fileGroup.isNegative(),
+                                            fileGroup.getFileFormat(), columnMappings,
+                                            where, partitionIds);
+        }
+
+        return etlFileGroup;
+    }
+
+    private void checkHllMapping(String columnName, Expr expr) throws LoadException {
+        if (expr == null) {
+            throw new LoadException("HLL column func is not assigned. column:" + columnName);
+        }
+
+        String msg = "HLL column must use hll function, like " + columnName + "=hll_hash(xxx) or "
+                + columnName + "=hll_empty()";
+        if (!(expr instanceof FunctionCallExpr)) {
+            throw new LoadException(msg);
+        }
+        FunctionCallExpr fn = (FunctionCallExpr) expr;
+        String functionName = fn.getFnName().getFunction();
+        if (!functionName.equalsIgnoreCase("hll_hash")
+                && !functionName.equalsIgnoreCase("hll_empty")) {
+            throw new LoadException(msg);
+        }
+    }
+
+    private void checkBitmapMapping(String columnName, Expr expr, boolean isLoadFromTable) throws LoadException {
+        if (expr == null) {
+            throw new LoadException("BITMAP column func is not assigned. column:" + columnName);
+        }
+
+        String msg = "BITMAP column must use bitmap function, like " + columnName + "=to_bitmap(xxx) or "
+                + columnName + "=bitmap_hash() or " + columnName + "=bitmap_dict()";
+        if (!(expr instanceof FunctionCallExpr)) {
+            throw new LoadException(msg);
+        }
+        FunctionCallExpr fn = (FunctionCallExpr) expr;
+        String functionName = fn.getFnName().getFunction();
+        if (!functionName.equalsIgnoreCase("to_bitmap")
+                && !functionName.equalsIgnoreCase("bitmap_hash")
+                && !functionName.equalsIgnoreCase("bitmap_dict")) {
+            throw new LoadException(msg);
+        }
+
+        if (functionName.equalsIgnoreCase("bitmap_dict") && !isLoadFromTable) {
+            throw new LoadException("Bitmap global dict should load data from hive table");
+        }
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java
new file mode 100644
index 0000000..311ca3b
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkPendingTaskAttachment.java
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.spark.launcher.SparkAppHandle;
+
+public class SparkPendingTaskAttachment extends TaskAttachment {
+    private SparkAppHandle handle;
+    private String appId;
+    private String outputPath;
+
+    public SparkPendingTaskAttachment(long taskId) {
+        super(taskId);
+    }
+
+    public SparkAppHandle getHandle() {
+        return handle;
+    }
+
+    public String getAppId() {
+        return appId;
+    }
+
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    public void setHandle(SparkAppHandle handle) {
+        this.handle = handle;
+    }
+
+    public String getOutputPath() {
+        return outputPath;
+    }
+
+    public void setOutputPath(String outputPath) {
+        this.outputPath = outputPath;
+    }
+
+    @Override
+    public String toString() {
+        return "SparkPendingTaskAttachment{" +
+                "appId='" + appId + '\'' +
+                ", outputPath='" + outputPath + '\'' +
+                '}';
+    }
+}
diff --git a/fe/src/main/java/org/apache/doris/persist/EditLog.java b/fe/src/main/java/org/apache/doris/persist/EditLog.java
index fa3ef9e..7f3da6c 100644
--- a/fe/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/src/main/java/org/apache/doris/persist/EditLog.java
@@ -53,6 +53,7 @@ import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.LoadErrorHub;
 import org.apache.doris.load.LoadJob;
+import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
 import org.apache.doris.load.loadv2.LoadJobFinalOperation;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.meta.MetaContext;
@@ -684,6 +685,11 @@ public class EditLog {
                     catalog.getLoadManager().replayEndLoadJob(operation);
                     break;
                 }
+                case OperationType.OP_UPDATE_LOAD_JOB: {
+                    LoadJobStateUpdateInfo info = (LoadJobStateUpdateInfo) journal.getData();
+                    catalog.getLoadManager().replayUpdateLoadJobStateInfo(info);
+                    break;
+                }
                 case OperationType.OP_CREATE_RESOURCE: {
                     final Resource resource = (Resource) journal.getData();
                     catalog.getResourceMgr().replayCreateResource(resource);
@@ -1266,6 +1272,10 @@ public class EditLog {
         logEdit(OperationType.OP_END_LOAD_JOB, loadJobFinalOperation);
     }
 
+    public void logUpdateLoadJob(LoadJobStateUpdateInfo info) {
+        logEdit(OperationType.OP_UPDATE_LOAD_JOB, info);
+    }
+
     public void logCreateResource(Resource resource) {
         // TODO(wyb): spark-load
         //logEdit(OperationType.OP_CREATE_RESOURCE, resource);
diff --git a/fe/src/main/java/org/apache/doris/persist/OperationType.java b/fe/src/main/java/org/apache/doris/persist/OperationType.java
index d06fcba..01171e6 100644
--- a/fe/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/src/main/java/org/apache/doris/persist/OperationType.java
@@ -156,7 +156,7 @@ public class OperationType {
     // this finish op include finished and cancelled
     public static final short OP_END_LOAD_JOB = 231;
     // update job info, used by spark load
-    //public static final short OP_UPDATE_LOAD_JOB = 232;
+    public static final short OP_UPDATE_LOAD_JOB = 232;
 
     // small files 251~260
     public static final short OP_CREATE_SMALL_FILE = 251;
diff --git a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 10e1ad7..8e9cbe2 100644
--- a/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -26,6 +26,8 @@ import org.apache.doris.catalog.HashDistributionInfo;
 import org.apache.doris.catalog.RandomDistributionInfo;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
+import org.apache.doris.load.loadv2.SparkLoadJob.SparkLoadJobStateUpdateInfo;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ArrayListMultimap;
@@ -102,6 +104,12 @@ public class GsonUtils {
             .registerSubtype(RollupJobV2.class, RollupJobV2.class.getSimpleName())
             .registerSubtype(SchemaChangeJobV2.class, SchemaChangeJobV2.class.getSimpleName());
 
+    // runtime adapter for class "LoadJobStateUpdateInfo"
+    private static RuntimeTypeAdapterFactory<LoadJobStateUpdateInfo> loadJobStateUpdateInfoTypeAdapterFactory
+            = RuntimeTypeAdapterFactory
+            .of(LoadJobStateUpdateInfo.class, "clazz")
+            .registerSubtype(SparkLoadJobStateUpdateInfo.class, SparkLoadJobStateUpdateInfo.class.getSimpleName());
+
     // the builder of GSON instance.
     // Add any other adapters if necessary.
     private static final GsonBuilder GSON_BUILDER = new GsonBuilder()
@@ -113,7 +121,8 @@ public class GsonUtils {
             .registerTypeAdapterFactory(columnTypeAdapterFactory)
             .registerTypeAdapterFactory(distributionInfoTypeAdapterFactory)
             .registerTypeAdapterFactory(resourceTypeAdapterFactory)
-            .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory);
+            .registerTypeAdapterFactory(alterJobV2TypeAdapterFactory)
+            .registerTypeAdapterFactory(loadJobStateUpdateInfoTypeAdapterFactory);
 
     // this instance is thread-safe.
     public static final Gson GSON = GSON_BUILDER.create();
diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 5df36ae..a4427ed 100644
--- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -354,7 +354,7 @@ public class BrokerScanNode extends LoadScanNode {
             for (BrokerFileGroup fileGroup : fileGroups) {
                 List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
                 for (String path : fileGroup.getFilePaths()) {
-                    BrokerUtil.parseBrokerFile(path, brokerDesc, fileStatuses);
+                    BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
                 }
                 fileStatusesList.add(fileStatuses);
                 filesAdded += fileStatuses.size();
diff --git a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
index 55a6e00..4be5d0f 100644
--- a/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
+++ b/fe/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
@@ -20,12 +20,44 @@ package org.apache.doris.common.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.catalog.BrokerMgr;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.GenericPool;
 import org.apache.doris.common.UserException;
+import org.apache.doris.thrift.TBrokerCloseReaderRequest;
+import org.apache.doris.thrift.TBrokerCloseWriterRequest;
+import org.apache.doris.thrift.TBrokerDeletePathRequest;
+import org.apache.doris.thrift.TBrokerFD;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerListPathRequest;
+import org.apache.doris.thrift.TBrokerListResponse;
+import org.apache.doris.thrift.TBrokerOpenReaderRequest;
+import org.apache.doris.thrift.TBrokerOpenReaderResponse;
+import org.apache.doris.thrift.TBrokerOpenWriterRequest;
+import org.apache.doris.thrift.TBrokerOpenWriterResponse;
+import org.apache.doris.thrift.TBrokerOperationStatus;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+import org.apache.doris.thrift.TBrokerPReadRequest;
+import org.apache.doris.thrift.TBrokerPWriteRequest;
+import org.apache.doris.thrift.TBrokerReadResponse;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloBrokerService;
 
 import com.google.common.collect.Lists;
-
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.Mocked;
+import mockit.MockUp;
+import org.apache.thrift.TException;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.UnsupportedEncodingException;
 import java.util.Collections;
 import java.util.List;
 
@@ -122,4 +154,172 @@ public class BrokerUtilTest {
         }
 
     }
-}
+
+    @Test
+    public void testReadFile(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog,
+                             @Injectable BrokerMgr brokerMgr)
+            throws TException, UserException, UnsupportedEncodingException {
+        // list response
+        TBrokerListResponse listResponse = new TBrokerListResponse();
+        TBrokerOperationStatus status = new TBrokerOperationStatus();
+        status.statusCode = TBrokerOperationStatusCode.OK;
+        listResponse.opStatus = status;
+        List<TBrokerFileStatus> files = Lists.newArrayList();
+        String filePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/dpp_result.json";
+        files.add(new TBrokerFileStatus(filePath, false, 10, false));
+        listResponse.files = files;
+
+        // open reader response
+        TBrokerOpenReaderResponse openReaderResponse = new TBrokerOpenReaderResponse();
+        openReaderResponse.opStatus = status;
+        openReaderResponse.fd = new TBrokerFD(1, 2);
+
+        // read response
+        String dppResultStr = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}";
+        TBrokerReadResponse readResponse = new TBrokerReadResponse();
+        readResponse.opStatus = status;
+        readResponse.setData(dppResultStr.getBytes("UTF-8"));
+
+        FsBroker fsBroker = new FsBroker("127.0.0.1", 99999);
+
+        new MockUp<GenericPool<TPaloBrokerService.Client>>() {
+            @Mock
+            public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception {
+                return client;
+            }
+
+            @Mock
+            public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) {
+                return;
+            }
+
+            @Mock
+            public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) {
+                return;
+            }
+        };
+
+        new Expectations() {
+            {
+                catalog.getBrokerMgr();
+                result = brokerMgr;
+                brokerMgr.getBroker(anyString, anyString);
+                result = fsBroker;
+                client.listPath((TBrokerListPathRequest) any);
+                result = listResponse;
+                client.openReader((TBrokerOpenReaderRequest) any);
+                result = openReaderResponse;
+                client.pread((TBrokerPReadRequest) any);
+                result = readResponse;
+                times = 1;
+                client.closeReader((TBrokerCloseReaderRequest) any);
+                result = status;
+            }
+        };
+
+        BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
+        byte[] data = BrokerUtil.readFile(filePath, brokerDesc);
+        String readStr = new String(data, "UTF-8");
+        Assert.assertEquals(dppResultStr, readStr);
+    }
+
+    @Test
+    public void testWriteFile(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog,
+                              @Injectable BrokerMgr brokerMgr)
+            throws TException, UserException, UnsupportedEncodingException {
+        // open writer response
+        TBrokerOpenWriterResponse openWriterResponse = new TBrokerOpenWriterResponse();
+        TBrokerOperationStatus status = new TBrokerOperationStatus();
+        status.statusCode = TBrokerOperationStatusCode.OK;
+        openWriterResponse.opStatus = status;
+        openWriterResponse.fd = new TBrokerFD(1, 2);
+        FsBroker fsBroker = new FsBroker("127.0.0.1", 99999);
+
+        new MockUp<GenericPool<TPaloBrokerService.Client>>() {
+            @Mock
+            public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception {
+                return client;
+            }
+
+            @Mock
+            public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) {
+                return;
+            }
+
+            @Mock
+            public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) {
+                return;
+            }
+        };
+
+        new Expectations() {
+            {
+                catalog.getBrokerMgr();
+                result = brokerMgr;
+                brokerMgr.getBroker(anyString, anyString);
+                result = fsBroker;
+                client.openWriter((TBrokerOpenWriterRequest) any);
+                result = openWriterResponse;
+                client.pwrite((TBrokerPWriteRequest) any);
+                result = status;
+                times = 1;
+                client.closeWriter((TBrokerCloseWriterRequest) any);
+                result = status;
+            }
+        };
+
+        BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
+        byte[] configs = "{'label': 'label0'}".getBytes("UTF-8");
+        String destFilePath = "hdfs://127.0.0.1:10000/doris/jobs/1/label6/9/configs/jobconfig.json";
+        try {
+            BrokerUtil.writeFile(configs, destFilePath, brokerDesc);
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDeletePath(@Mocked TPaloBrokerService.Client client, @Mocked Catalog catalog,
+                               @Injectable BrokerMgr brokerMgr) throws AnalysisException, TException {
+        // delete response
+        TBrokerOperationStatus status = new TBrokerOperationStatus();
+        status.statusCode = TBrokerOperationStatusCode.OK;
+        FsBroker fsBroker = new FsBroker("127.0.0.1", 99999);
+
+        new MockUp<GenericPool<TPaloBrokerService.Client>>() {
+            @Mock
+            public TPaloBrokerService.Client borrowObject(TNetworkAddress address) throws Exception {
+                return client;
+            }
+
+            @Mock
+            public void returnObject(TNetworkAddress address, TPaloBrokerService.Client object) {
+                return;
+            }
+
+            @Mock
+            public void invalidateObject(TNetworkAddress address, TPaloBrokerService.Client object) {
+                return;
+            }
+        };
+
+        new Expectations() {
+            {
+                catalog.getBrokerMgr();
+                result = brokerMgr;
+                brokerMgr.getBroker(anyString, anyString);
+                result = fsBroker;
+                client.deletePath((TBrokerDeletePathRequest) any);
+                result = status;
+                times = 1;
+            }
+        };
+
+        try {
+            BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
+            BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc);
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java
index 9b15819..6e9f81e 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadPendingTaskTest.java
@@ -64,7 +64,7 @@ public class BrokerLoadPendingTaskTest {
         };
         new MockUp<BrokerUtil>() {
             @Mock
-            public void parseBrokerFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses) {
+            public void parseFile(String path, BrokerDesc brokerDesc, List<TBrokerFileStatus> fileStatuses) {
                 fileStatuses.add(tBrokerFileStatus);
             }
         };
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
index dd69932..f324b76 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadJobTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.DuplicatedRequestException;
 import org.apache.doris.common.LabelAlreadyUsedException;
+import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
 import org.apache.doris.common.jmockit.Deencapsulation;
 import org.apache.doris.metric.LongCounterMetric;
@@ -122,7 +123,11 @@ public class LoadJobTest {
             }
         };
 
-        loadJob.execute();
+        try {
+            loadJob.execute();
+        } catch (LoadException e) {
+            Assert.fail(e.getMessage());
+        }
         Assert.assertEquals(JobState.LOADING, loadJob.getState());
         Assert.assertEquals(1, loadJob.getTransactionId());
 
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
new file mode 100644
index 0000000..374bbb2
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/SparkLoadPendingTaskTest.java
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.DataDescription;
+import org.apache.doris.analysis.PartitionKeyDesc;
+import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.analysis.SingleRangePartitionDesc;
+import org.apache.doris.catalog.AggregateType;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.DistributionInfo;
+import org.apache.doris.catalog.HashDistributionInfo;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.PartitionInfo;
+import org.apache.doris.catalog.RangePartitionInfo;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.SinglePartitionInfo;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.LoadException;
+import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.load.BrokerFileGroupAggInfo;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlFileGroup;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlIndex;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartition;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlPartitionInfo;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig.EtlTable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+import java.util.List;
+import java.util.Map;
+
+public class SparkLoadPendingTaskTest {
+
+    @Test
+    public void testExecuteTask(@Injectable SparkLoadJob sparkLoadJob,
+                                @Injectable SparkResource resource,
+                                @Injectable BrokerDesc brokerDesc,
+                                @Mocked Catalog catalog,
+                                @Injectable Database database,
+                                @Injectable OlapTable table) throws LoadException {
+        long dbId = 0L;
+        long tableId = 1L;
+
+        // columns
+        List<Column> columns = Lists.newArrayList();
+        columns.add(new Column("c1", Type.BIGINT, true, null, false, null, ""));
+
+        // indexes
+        Map<Long, List<Column>> indexIdToSchema = Maps.newHashMap();
+        long indexId = 3L;
+        indexIdToSchema.put(indexId, columns);
+
+        // partition and distribution infos
+        long partitionId = 2L;
+        DistributionInfo distributionInfo = new HashDistributionInfo(2, Lists.newArrayList(columns.get(0)));
+        PartitionInfo partitionInfo = new SinglePartitionInfo();
+        Partition partition = new Partition(partitionId, "p1", null, distributionInfo);
+        List<Partition> partitions = Lists.newArrayList(partition);
+
+        // file group
+        Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+        List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
+        DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
+                                                   null, null, null, false, null);
+        BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
+        brokerFileGroups.add(brokerFileGroup);
+        BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
+        aggKeyToFileGroups.put(aggKey, brokerFileGroups);
+
+        new Expectations() {
+            {
+                catalog.getDb(dbId);
+                result = database;
+                database.getTable(tableId);
+                result = table;
+                table.getPartitions();
+                result = partitions;
+                table.getIndexIdToSchema();
+                result = indexIdToSchema;
+                table.getDefaultDistributionInfo();
+                result = distributionInfo;
+                table.getSchemaHashByIndexId(indexId);
+                result = 123;
+                table.getPartitionInfo();
+                result = partitionInfo;
+                table.getPartition(partitionId);
+                result = partition;
+                table.getKeysTypeByIndexId(indexId);
+                result = KeysType.DUP_KEYS;
+                table.getBaseIndexId();
+                result = indexId;
+            }
+        };
+
+        String appId = "application_15888888888_0088";
+        new MockUp<SparkEtlJobHandler>() {
+            @Mock
+            public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobConfig,
+                                     SparkResource resource, BrokerDesc brokerDesc,
+                                     SparkPendingTaskAttachment attachment) throws LoadException {
+                attachment.setAppId(appId);
+            }
+        };
+
+        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
+        task.init();
+        SparkPendingTaskAttachment attachment = Deencapsulation.getField(task, "attachment");
+        Assert.assertEquals(null, attachment.getAppId());
+        task.executeTask();
+        Assert.assertEquals(appId, attachment.getAppId());
+    }
+
+    @Test(expected = LoadException.class)
+    public void testNoDb(@Injectable SparkLoadJob sparkLoadJob,
+                         @Injectable SparkResource resource,
+                         @Injectable BrokerDesc brokerDesc,
+                         @Mocked Catalog catalog) throws LoadException {
+        long dbId = 0L;
+
+        new Expectations() {
+            {
+                catalog.getDb(dbId);
+                result = null;
+            }
+        };
+
+        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, null, resource, brokerDesc);
+        task.init();
+    }
+
+    @Test(expected = LoadException.class)
+    public void testNoTable(@Injectable SparkLoadJob sparkLoadJob,
+                            @Injectable SparkResource resource,
+                            @Injectable BrokerDesc brokerDesc,
+                            @Mocked Catalog catalog,
+                            @Injectable Database database) throws LoadException {
+        long dbId = 0L;
+        long tableId = 1L;
+
+        Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+        List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
+        DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
+                                                   null, null, null, false, null);
+        BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
+        brokerFileGroups.add(brokerFileGroup);
+        BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
+        aggKeyToFileGroups.put(aggKey, brokerFileGroups);
+
+        new Expectations() {
+            {
+                catalog.getDb(dbId);
+                result = database;
+                database.getTable(tableId);
+                result = null;
+            }
+        };
+
+        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
+        task.init();
+    }
+
+    @Test
+    public void testRangePartitionHashDistribution(@Injectable SparkLoadJob sparkLoadJob,
+                                                   @Injectable SparkResource resource,
+                                                   @Injectable BrokerDesc brokerDesc,
+                                                   @Mocked Catalog catalog,
+                                                   @Injectable Database database,
+                                                   @Injectable OlapTable table) throws LoadException, DdlException, AnalysisException {
+        long dbId = 0L;
+        long tableId = 1L;
+
+        // c1 is partition column, c2 is distribution column
+        List<Column> columns = Lists.newArrayList();
+        columns.add(new Column("c1", Type.INT, true, null, false, null, ""));
+        columns.add(new Column("c2", ScalarType.createVarchar(10), true, null, false, null, ""));
+        columns.add(new Column("c3", Type.INT, false, AggregateType.SUM, false, null, ""));
+
+        // indexes
+        Map<Long, List<Column>> indexIdToSchema = Maps.newHashMap();
+        long index1Id = 3L;
+        indexIdToSchema.put(index1Id, columns);
+        long index2Id = 4L;
+        indexIdToSchema.put(index2Id, Lists.newArrayList(columns.get(0), columns.get(2)));
+
+        // partition and distribution info
+        long partition1Id = 2L;
+        long partition2Id = 5L;
+        int distributionColumnIndex = 1;
+        DistributionInfo distributionInfo = new HashDistributionInfo(3, Lists.newArrayList(columns.get(distributionColumnIndex)));
+        Partition partition1 = new Partition(partition1Id, "p1", null,
+                                             distributionInfo);
+        Partition partition2 = new Partition(partition2Id, "p2", null,
+                                             new HashDistributionInfo(4, Lists.newArrayList(columns.get(distributionColumnIndex))));
+        int partitionColumnIndex = 0;
+        List<Partition> partitions = Lists.newArrayList(partition1, partition2);
+        RangePartitionInfo partitionInfo = new RangePartitionInfo(Lists.newArrayList(columns.get(partitionColumnIndex)));
+        PartitionKeyDesc partitionKeyDesc1 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("10")));
+        SingleRangePartitionDesc partitionDesc1 = new SingleRangePartitionDesc(false, "p1", partitionKeyDesc1, null);
+        partitionDesc1.analyze(1, null);
+        partitionInfo.handleNewSinglePartitionDesc(partitionDesc1, partition1Id, false);
+        PartitionKeyDesc partitionKeyDesc2 = new PartitionKeyDesc(Lists.newArrayList(new PartitionValue("20")));
+        SingleRangePartitionDesc partitionDesc2 = new SingleRangePartitionDesc(false, "p2", partitionKeyDesc2, null);
+        partitionDesc2.analyze(1, null);
+        partitionInfo.handleNewSinglePartitionDesc(partitionDesc2, partition2Id, false);
+
+        // file group
+        Map<BrokerFileGroupAggInfo.FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+        List<BrokerFileGroup> brokerFileGroups = Lists.newArrayList();
+        DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"),
+                                                   null, null, null, false, null);
+        BrokerFileGroup brokerFileGroup = new BrokerFileGroup(desc);
+        brokerFileGroups.add(brokerFileGroup);
+        BrokerFileGroupAggInfo.FileGroupAggKey aggKey = new BrokerFileGroupAggInfo.FileGroupAggKey(tableId, null);
+        aggKeyToFileGroups.put(aggKey, brokerFileGroups);
+
+        new Expectations() {
+            {
+                catalog.getDb(dbId);
+                result = database;
+                database.getTable(tableId);
+                result = table;
+                table.getPartitions();
+                result = partitions;
+                table.getIndexIdToSchema();
+                result = indexIdToSchema;
+                table.getDefaultDistributionInfo();
+                result = distributionInfo;
+                table.getSchemaHashByIndexId(index1Id);
+                result = 123;
+                table.getSchemaHashByIndexId(index2Id);
+                result = 234;
+                table.getPartitionInfo();
+                result = partitionInfo;
+                table.getPartition(partition1Id);
+                result = partition1;
+                table.getPartition(partition2Id);
+                result = partition2;
+                table.getKeysTypeByIndexId(index1Id);
+                result = KeysType.AGG_KEYS;
+                table.getKeysTypeByIndexId(index2Id);
+                result = KeysType.AGG_KEYS;
+                table.getBaseIndexId();
+                result = index1Id;
+            }
+        };
+
+        SparkLoadPendingTask task = new SparkLoadPendingTask(sparkLoadJob, aggKeyToFileGroups, resource, brokerDesc);
+        EtlJobConfig etlJobConfig = Deencapsulation.getField(task, "etlJobConfig");
+        Assert.assertEquals(null, etlJobConfig);
+        task.init();
+        etlJobConfig = Deencapsulation.getField(task, "etlJobConfig");
+        Assert.assertTrue(etlJobConfig != null);
+
+        // check table id
+        Map<Long, EtlTable> idToEtlTable = etlJobConfig.tables;
+        Assert.assertEquals(1, idToEtlTable.size());
+        Assert.assertTrue(idToEtlTable.containsKey(tableId));
+
+        // check indexes
+        EtlTable etlTable = idToEtlTable.get(tableId);
+        List<EtlIndex> etlIndexes = etlTable.indexes;
+        Assert.assertEquals(2, etlIndexes.size());
+        Assert.assertEquals(index1Id, etlIndexes.get(0).indexId);
+        Assert.assertEquals(index2Id, etlIndexes.get(1).indexId);
+
+        // check base index columns
+        EtlIndex baseIndex = etlIndexes.get(0);
+        Assert.assertTrue(baseIndex.isBaseIndex);
+        Assert.assertEquals(3, baseIndex.columns.size());
+        for (int i = 0; i < columns.size(); i++) {
+            Assert.assertEquals(columns.get(i).getName(), baseIndex.columns.get(i).columnName);
+        }
+        Assert.assertEquals("AGGREGATE", baseIndex.indexType);
+
+        // check partitions
+        EtlPartitionInfo etlPartitionInfo = etlTable.partitionInfo;
+        Assert.assertEquals("RANGE", etlPartitionInfo.partitionType);
+        List<String> partitionColumns = etlPartitionInfo.partitionColumnRefs;
+        Assert.assertEquals(1, partitionColumns.size());
+        Assert.assertEquals(columns.get(partitionColumnIndex).getName(), partitionColumns.get(0));
+        List<String> distributionColumns = etlPartitionInfo.distributionColumnRefs;
+        Assert.assertEquals(1, distributionColumns.size());
+        Assert.assertEquals(columns.get(distributionColumnIndex).getName(), distributionColumns.get(0));
+        List<EtlPartition> etlPartitions = etlPartitionInfo.partitions;
+        Assert.assertEquals(2, etlPartitions.size());
+
+        // check file group
+        List<EtlFileGroup> etlFileGroups = etlTable.fileGroups;
+        Assert.assertEquals(1, etlFileGroups.size());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org