You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/06/22 11:58:46 UTC

[incubator-doris] branch master updated: [Spark Load]Using SparkDpp to complete some calculation in Spark Load (#3729)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8092aad  [Spark Load]Using SparkDpp to complete some calculation in Spark Load (#3729)
8092aad is described below

commit 8092aadc8313a3a5a3e3596221d1ea3418492657
Author: wangbo <50...@qq.com>
AuthorDate: Mon Jun 22 19:58:34 2020 +0800

    [Spark Load]Using SparkDpp to complete some calculation in Spark Load (#3729)
---
 .../apache/doris/load/loadv2/dpp/ColumnParser.java | 188 +++++
 .../load/loadv2/dpp/DorisKryoRegistrator.java      |  29 +
 .../load/loadv2/dpp/DorisRangePartitioner.java     |  87 ++
 .../apache/doris/load/loadv2/dpp/DppColumns.java   | 112 +++
 .../apache/doris/load/loadv2/dpp/DppResult.java    |  64 ++
 .../org/apache/doris/load/loadv2/dpp/DppUtils.java | 257 ++++++
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 880 +++++++++++++++++++++
 .../doris/load/loadv2/dpp/SparkRDDAggregator.java  | 564 +++++++++++++
 .../doris/load/loadv2/dpp/StringAccumulator.java   |  64 ++
 .../load/loadv2/dpp/DorisRangePartitionerTest.java | 144 ++++
 .../apache/doris/load/loadv2/dpp/DppUtilsTest.java | 244 ++++++
 .../dpp/MinimumCoverageRollupTreeBuilderTest.java  | 111 +++
 12 files changed, 2744 insertions(+)

diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
new file mode 100644
index 0000000..c31ed1b
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.Date;
+
+// Parser to validate value for different type
+public abstract class ColumnParser implements Serializable {
+    public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws UserException {
+        String columnType = etlColumn.columnType;
+        if (columnType.equalsIgnoreCase("TINYINT")) {
+            return new TinyIntParser();
+        } else if (columnType.equalsIgnoreCase("SMALLINT")) {
+            return new SmallIntParser();
+        } else if (columnType.equalsIgnoreCase("INT")) {
+            return new IntParser();
+        } else if (columnType.equalsIgnoreCase("BIGINT")) {
+            return new BigIntParser();
+        } else if (columnType.equalsIgnoreCase("FLOAT")) {
+            return new FloatParser();
+        } else if (columnType.equalsIgnoreCase("DOUBLE")) {
+            return new DoubleParser();
+        } else if (columnType.equalsIgnoreCase("BOOLEAN")) {
+            return new BooleanParser();
+        } else if (columnType.equalsIgnoreCase("DATE")) {
+            return new DateParser();
+        } else if (columnType.equalsIgnoreCase("DATETIME")) {
+            return new DatetimeParser();
+        } else if (columnType.equalsIgnoreCase("VARCHAR")
+                || columnType.equalsIgnoreCase("CHAR")
+                || columnType.equalsIgnoreCase("BITMAP")
+                || columnType.equalsIgnoreCase("HLL")) {
+            return new StringParser(etlColumn);
+        } else {
+            throw new UserException("unsupported type:" + columnType);
+        }
+    }
+
+    public abstract boolean parse(String value);
+}
+
+class TinyIntParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Short parsed = Short.parseShort(value);
+            if (parsed > 127 || parsed < -128) {
+                return false;
+            }
+        } catch (NumberFormatException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class SmallIntParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Short.parseShort(value);
+        } catch (NumberFormatException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class IntParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Integer.parseInt(value);
+        } catch (NumberFormatException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class BigIntParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Integer.parseInt(value);
+        } catch (NumberFormatException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class FloatParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Float.parseFloat(value);
+        } catch (NumberFormatException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class DoubleParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Double.parseDouble(value);
+        } catch (NumberFormatException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class BooleanParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        if (value.equalsIgnoreCase("true")
+                || value.equalsIgnoreCase("false")) {
+            return true;
+        }
+        return false;
+    }
+}
+
+class DateParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            Date.parse(value);
+        } catch (IllegalArgumentException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class DatetimeParser extends ColumnParser {
+    @Override
+    public boolean parse(String value) {
+        try {
+            DateTime.parse(value);
+        } catch (IllegalArgumentException e) {
+            return false;
+        }
+        return true;
+    }
+}
+
+class StringParser extends ColumnParser {
+
+    private EtlJobConfig.EtlColumn etlColumn;
+
+    public StringParser(EtlJobConfig.EtlColumn etlColumn) {
+        this.etlColumn = etlColumn;
+    }
+
+    @Override
+    public boolean parse(String value) {
+        try {
+            return value.getBytes("UTF-8").length <= etlColumn.stringLength;
+        } catch (Exception e) {
+            throw new RuntimeException("string check failed ", e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
new file mode 100644
index 0000000..d2568bb
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.spark.serializer.KryoRegistrator;
+
+public class DorisKryoRegistrator implements KryoRegistrator {
+
+    @Override
+    public void registerClasses(Kryo kryo) {
+        kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class);
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java
new file mode 100644
index 0000000..c92529f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.spark.Partitioner;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class DorisRangePartitioner extends Partitioner {
+    private static final String UNPARTITIONED_TYPE = "UNPARTITIONED";
+    private EtlJobConfig.EtlPartitionInfo partitionInfo;
+    private List<PartitionRangeKey> partitionRangeKeys;
+    List<Integer> partitionKeyIndexes;
+    public DorisRangePartitioner(EtlJobConfig.EtlPartitionInfo partitionInfo,
+                                 List<Integer> partitionKeyIndexes,
+                                 List<PartitionRangeKey> partitionRangeKeys) {
+        this.partitionInfo = partitionInfo;
+        this.partitionKeyIndexes = partitionKeyIndexes;
+        this.partitionRangeKeys = partitionRangeKeys;
+    }
+
+    public int numPartitions() {
+        if (partitionInfo == null) {
+            return 0;
+        }
+        if (partitionInfo.partitionType.equalsIgnoreCase(UNPARTITIONED_TYPE)) {
+            return 1;
+        }
+        return partitionInfo.partitions.size();
+    }
+
+    public int getPartition(Object var1) {
+        if (partitionInfo.partitionType != null
+                && partitionInfo.partitionType.equalsIgnoreCase(UNPARTITIONED_TYPE)) {
+            return 0;
+        }
+        DppColumns key = (DppColumns)var1;
+        // get the partition columns from key as partition key
+        DppColumns partitionKey = new DppColumns(key, partitionKeyIndexes);
+        // TODO: optimize this by use binary search
+        for (int i = 0; i < partitionRangeKeys.size(); ++i) {
+            if (partitionRangeKeys.get(i).isRowContained(partitionKey)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    public static class PartitionRangeKey implements Serializable {
+        public boolean isMaxPartition;
+        public DppColumns startKeys;
+        public DppColumns endKeys;
+
+        public boolean isRowContained(DppColumns row) {
+            if (isMaxPartition) {
+                return startKeys.compareTo(row) <= 0;
+            } else {
+                return startKeys.compareTo(row) <= 0 && endKeys.compareTo(row) > 0;
+            }
+        }
+
+        public String toString() {
+            return "PartitionRangeKey{" +
+                    "isMaxPartition=" + isMaxPartition +
+                    ", startKeys=" + startKeys +
+                    ", endKeys=" + endKeys +
+                    '}';
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java
new file mode 100644
index 0000000..b5efb2c
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Objects;
+import java.util.Comparator;
+
+// DppColumns is used to store the
+class DppColumns implements Comparable<DppColumns>, Serializable {
+    public List<Object> columns = new ArrayList<Object>();;
+
+    public DppColumns(List<Object> keys){
+        this.columns = keys;
+    }
+
+    public DppColumns(DppColumns key, List<Integer> indexes){
+        for (int i = 0; i < indexes.size(); ++i) {
+            columns.add(key.columns.get(indexes.get(i)));
+        }
+    }
+
+    @Override
+    public int compareTo(DppColumns other) {
+        Preconditions.checkState(columns.size() == other.columns.size());
+
+        int cmp = 0;
+        for (int i = 0; i < columns.size(); i++) {
+            Object columnObj = columns.get(i);
+            Object otherColumn = other.columns.get(i);
+            if (columnObj == null && otherColumn == null) {
+                return 0;
+            } else if (columnObj == null || otherColumn == null) {
+                if (columnObj == null) {
+                    return -1;
+                } else {
+                    return 1;
+                }
+            }
+            if (columns.get(i) instanceof Integer) {
+                cmp = ((Integer)(columns.get(i))).compareTo((Integer)(other.columns.get(i)));
+            } else if (columns.get(i) instanceof Long) {
+                cmp = ((Long)(columns.get(i))).compareTo((Long)(other.columns.get(i)));
+            }  else if (columns.get(i) instanceof  Boolean) {
+                cmp = ((Boolean)(columns.get(i))).compareTo((Boolean) (other.columns.get(i)));
+            } else if (columns.get(i) instanceof  Short) {
+                cmp = ((Short)(columns.get(i))).compareTo((Short)(other.columns.get(i)));
+            } else if (columns.get(i) instanceof  Float) {
+                cmp = ((Float)(columns.get(i))).compareTo((Float) (other.columns.get(i)));
+            } else if (columns.get(i) instanceof Double) {
+                cmp = ((Double)(columns.get(i))).compareTo((Double) (other.columns.get(i)));
+            } else if (columns.get(i) instanceof Date) {
+                cmp = ((Date)(columns.get(i))).compareTo((Date) (other.columns.get(i)));
+            } else if (columns.get(i) instanceof java.sql.Timestamp) {
+                cmp = ((java.sql.Timestamp)columns.get(i)).compareTo((java.sql.Timestamp)other.columns.get(i));
+            } else {
+                cmp = ((String)(columns.get(i))).compareTo((String) (other.columns.get(i)));
+            }
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+        return cmp;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        DppColumns dppColumns = (DppColumns) o;
+        return Objects.equals(columns, dppColumns.columns);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columns);
+    }
+
+    @Override
+    public String toString() {
+        return "dppColumns{" +
+                "columns=" + columns +
+                '}';
+    }
+}
+
+class DppColumnsComparator implements Comparator<DppColumns> {
+    @Override
+    public int compare(DppColumns left, DppColumns right) {
+        return left.compareTo(right);
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
new file mode 100644
index 0000000..fa813ca
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.Serializable;
+
+public class DppResult implements Serializable {
+    DppResult() {
+        isSuccess = true;
+        failedReason = "";
+        scannedRows = 0;
+        fileNumber = 0;
+        fileSize = 0;
+        normalRows = 0;
+        abnormalRows = 0;
+        unselectRows = 0;
+        partialAbnormalRows = "";
+    }
+
+    @SerializedName("is_success")
+    public boolean isSuccess;
+
+    @SerializedName("failed_reason")
+    public String failedReason;
+
+    @SerializedName("scanned_rows")
+    public long scannedRows;
+
+    @SerializedName("file_number")
+    public long fileNumber;
+
+    @SerializedName("file_size")
+    public long fileSize;
+
+    @SerializedName("normal_rows")
+    public long normalRows;
+
+    @SerializedName("abnormal_rows")
+    public long abnormalRows;
+
+    @SerializedName("unselect_rows")
+    public long unselectRows;
+
+    // only part of abnormal rows will be returned
+    @SerializedName("partial_abnormal_rows")
+    public String partialAbnormalRows;
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java
new file mode 100644
index 0000000..d0882ac
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.Row;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.zip.CRC32;
+
+public class DppUtils {
+    public static final String BUCKET_ID = "__bucketId__";
+    public static Class getClassFromDataType(DataType dataType) {
+        if (dataType == null) {
+            return null;
+        }
+        if (dataType.equals(DataTypes.BooleanType)) {
+            return Boolean.class;
+        } else if (dataType.equals(DataTypes.ShortType)) {
+            return Short.class;
+        } else if (dataType.equals(DataTypes.IntegerType)) {
+            return Integer.class;
+        } else if (dataType.equals(DataTypes.LongType)) {
+            return Long.class;
+        } else if (dataType.equals(DataTypes.FloatType)) {
+            return Float.class;
+        } else if (dataType.equals(DataTypes.DoubleType)) {
+            return Double.class;
+        } else if (dataType.equals(DataTypes.DateType)) {
+            return Date.class;
+        } else if (dataType.equals(DataTypes.StringType)) {
+            return String.class;
+        } else if (dataType instanceof DecimalType) {
+            DecimalType decimalType = (DecimalType)dataType;
+            return BigDecimal.valueOf(decimalType.precision(), decimalType.scale()).getClass();
+        } else if (dataType.equals(DataTypes.TimestampType)) {
+            return Long.class;
+        }
+        return null;
+    }
+
+    public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws UserException {
+        switch (column.columnType) {
+            case "BOOLEAN":
+                return Boolean.class;
+            case "TINYINT":
+            case "SMALLINT":
+                return Short.class;
+            case "INT":
+                return Integer.class;
+            case "DATETIME":
+                return java.sql.Timestamp.class;
+            case "BIGINT":
+                return Long.class;
+            case "LARGEINT":
+                throw new UserException("LARGEINT is not supported now");
+            case "FLOAT":
+                return Float.class;
+            case "DOUBLE":
+                return Double.class;
+            case "DATE":
+                return Date.class;
+            case "HLL":
+            case "CHAR":
+            case "VARCHAR":
+            case "BITMAP":
+            case "OBJECT":
+                return String.class;
+            case "DECIMALV2":
+                return BigDecimal.valueOf(column.precision, column.scale).getClass();
+            default:
+                return String.class;
+        }
+    }
+
+    public static DataType getDataTypeFromColumn(EtlJobConfig.EtlColumn column, boolean regardDistinctColumnAsBinary) {
+        DataType dataType = DataTypes.StringType;
+        switch (column.columnType) {
+            case "BOOLEAN":
+                dataType = DataTypes.StringType;
+                break;
+            case "TINYINT":
+                dataType = DataTypes.ByteType;
+                break;
+            case "SMALLINT":
+                dataType = DataTypes.ShortType;
+                break;
+            case "INT":
+                dataType = DataTypes.IntegerType;
+                break;
+            case "DATETIME":
+                dataType = DataTypes.TimestampType;
+                break;
+            case "BIGINT":
+                dataType = DataTypes.LongType;
+                break;
+            case "LARGEINT":
+                dataType = DataTypes.StringType;
+                break;
+            case "FLOAT":
+                dataType = DataTypes.FloatType;
+                break;
+            case "DOUBLE":
+                dataType = DataTypes.DoubleType;
+                break;
+            case "DATE":
+                dataType = DataTypes.DateType;
+                break;
+            case "CHAR":
+            case "VARCHAR":
+            case "OBJECT":
+                dataType = DataTypes.StringType;
+                break;
+            case "HLL":
+            case "BITMAP":
+                dataType = regardDistinctColumnAsBinary ? DataTypes.BinaryType : DataTypes.StringType;
+                break;
+            case "DECIMALV2":
+                dataType = DecimalType.apply(column.precision, column.scale);
+                break;
+            default:
+                throw new RuntimeException("Reason: invalid column type:" + column);
+        }
+        return dataType;
+    }
+
+    public static ByteBuffer getHashValue(Object o, DataType type) {
+        ByteBuffer buffer = ByteBuffer.allocate(8);
+        buffer.order(ByteOrder.LITTLE_ENDIAN);
+        if (o == null) {
+            buffer.putInt(0);
+            return buffer;
+        }
+        if (type.equals(DataTypes.ByteType)) {
+            buffer.put((byte)o);
+        } else if (type.equals(DataTypes.ShortType)) {
+            buffer.putShort((Short)o);
+        } else if (type.equals(DataTypes.IntegerType)) {
+            buffer.putInt((Integer) o);
+        } else if (type.equals(DataTypes.LongType)) {
+            buffer.putLong((Long)o);
+        } else if (type.equals(DataTypes.StringType)) {
+            try {
+                String str = String.valueOf(o);
+                buffer = ByteBuffer.wrap(str.getBytes("UTF-8"));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        } else if (type.equals(DataTypes.BooleanType)) {
+            Boolean b = (Boolean)o;
+            String str = b ? "1" : "0";
+            try {
+                buffer = ByteBuffer.wrap(str.getBytes("UTF-8"));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+        return buffer;
+    }
+
+    public static long getHashValue(Row row, List<String> distributeColumns, StructType dstTableSchema) {
+        CRC32 hashValue = new CRC32();
+        for (String distColumn : distributeColumns) {
+            Object columnObject = row.get(row.fieldIndex(distColumn));
+            ByteBuffer buffer = getHashValue(columnObject, dstTableSchema.apply(distColumn).dataType());
+            hashValue.update(buffer.array(), 0, buffer.limit());
+        }
+        return hashValue.getValue();
+    }
+
+    public static StructType createDstTableSchema(List<EtlJobConfig.EtlColumn> columns, boolean addBucketIdColumn, boolean regardDistinctColumnAsBinary) {
+        List<StructField> fields = new ArrayList<>();
+        if (addBucketIdColumn) {
+            StructField bucketIdField = DataTypes.createStructField(BUCKET_ID, DataTypes.StringType, true);
+            fields.add(bucketIdField);
+        }
+        for (EtlJobConfig.EtlColumn column : columns) {
+            DataType structColumnType = getDataTypeFromColumn(column, regardDistinctColumnAsBinary);
+            StructField field = DataTypes.createStructField(column.columnName, structColumnType, column.isAllowNull);
+            fields.add(field);
+        }
+        StructType dstSchema = DataTypes.createStructType(fields);
+        return dstSchema;
+    }
+
+    public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath) throws UserException {
+        if (columnsFromPath == null || columnsFromPath.isEmpty()) {
+            return Collections.emptyList();
+        }
+        String[] strings = filePath.split("/");
+        if (strings.length < 2) {
+            System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+            throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+        }
+        String[] columns = new String[columnsFromPath.size()];
+        int size = 0;
+        for (int i = strings.length - 2; i >= 0; i--) {
+            String str = strings[i];
+            if (str != null && str.isEmpty()) {
+                continue;
+            }
+            if (str == null || !str.contains("=")) {
+                System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+                throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+            }
+            String[] pair = str.split("=", 2);
+            if (pair.length != 2) {
+                System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+                throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+            }
+            int index = columnsFromPath.indexOf(pair[0]);
+            if (index == -1) {
+                continue;
+            }
+            columns[index] = pair[1];
+            size++;
+            if (size >= columnsFromPath.size()) {
+                break;
+            }
+        }
+        if (size != columnsFromPath.size()) {
+            System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+            throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+        }
+        return Lists.newArrayList(columns);
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
new file mode 100644
index 0000000..80becc4
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -0,0 +1,880 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.function.ForeachPartitionFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.LongAccumulator;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.TaskContext;
+
+import scala.Tuple2;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import scala.collection.JavaConverters;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+// This class is a Spark-based data preprocessing program,
+// which will make use of the distributed compute framework of spark to
+// do ETL job/sort/preaggregate jobs in spark job
+// to boost the process of large amount of data load.
+// the process steps are as following:
+// 1. load data
+//     1.1 load data from path/hive table
+//     1.2 do the etl process
+// 2. repartition data by using doris data model(partition and bucket)
+// 3. process aggregation if needed
+// 4. write data to parquet file
+public final class SparkDpp implements java.io.Serializable {
+    private static final Logger LOG = LogManager.getLogger(SparkDpp.class);
+
+    private static final String NULL_FLAG = "\\N";
+    private static final String DPP_RESULT_FILE = "dpp_result.json";
+    private static final String BITMAP_TYPE = "bitmap";
+    private SparkSession spark = null;
+    private EtlJobConfig etlJobConfig = null;
+    private LongAccumulator abnormalRowAcc = null;
+    private LongAccumulator unselectedRowAcc = null;
+    private LongAccumulator scannedRowsAcc = null;
+    private LongAccumulator fileNumberAcc = null;
+    private LongAccumulator fileSizeAcc = null;
+    private Map<String, Integer> bucketKeyMap = new HashMap<>();
+    // accumulator to collect invalid rows
+    private StringAccumulator invalidRows = new StringAccumulator();
+
+    public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
+        this.spark = spark;
+        this.etlJobConfig = etlJobConfig;
+    }
+
+    public void init() {
+        abnormalRowAcc = spark.sparkContext().longAccumulator("abnormalRowAcc");
+        unselectedRowAcc = spark.sparkContext().longAccumulator("unselectedRowAcc");
+        scannedRowsAcc = spark.sparkContext().longAccumulator("scannedRowsAcc");
+        fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc");
+        fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc");
+        spark.sparkContext().register(invalidRows, "InvalidRowsAccumulator");
+    }
+
+    private Dataset<Row> processRDDAggAndRepartition(Dataset<Row> dataframe, EtlJobConfig.EtlIndex currentIndexMeta) throws UserException {
+        final boolean isDuplicateTable = !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "AGGREGATE")
+                && !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "UNIQUE");
+
+        // 1 make metadata for map/reduce
+        int keyLen = 0;
+        for (EtlJobConfig.EtlColumn etlColumn : currentIndexMeta.columns) {
+            keyLen = etlColumn.isKey ? keyLen + 1 : keyLen;
+        }
+
+        SparkRDDAggregator[] sparkRDDAggregators = new SparkRDDAggregator[currentIndexMeta.columns.size() - keyLen];
+
+        for (int i = 0 ; i < currentIndexMeta.columns.size(); i++) {
+            if (!currentIndexMeta.columns.get(i).isKey && !isDuplicateTable) {
+                sparkRDDAggregators[i - keyLen] = SparkRDDAggregator.buildAggregator(currentIndexMeta.columns.get(i));
+            }
+        }
+
+        PairFunction<Row, List<Object>, Object[]> encodePairFunction = isDuplicateTable ?
+                // add 1 to include bucketId
+                new EncodeDuplicateTableFunction(keyLen + 1, currentIndexMeta.columns.size() - keyLen)
+                : new EncodeAggregateTableFunction(sparkRDDAggregators, keyLen + 1);
+
+        // 2 convert dataframe to rdd and  encode key and value
+        // TODO(wb) use rdd to avoid bitamp/hll serialize when calculate rollup
+        JavaPairRDD<List<Object>, Object[]> currentRollupRDD = dataframe.toJavaRDD().mapToPair(encodePairFunction);
+
+        // 3 do aggregate
+        // TODO(wb) set the reduce concurrency by statistic instead of hard code 200
+        int aggregateConcurrency = 200;
+        JavaPairRDD<List<Object>, Object[]> reduceResultRDD = isDuplicateTable ? currentRollupRDD
+                : currentRollupRDD.reduceByKey(new AggregateReduceFunction(sparkRDDAggregators), aggregateConcurrency);
+
+        // 4 repartition and finalize value column
+        JavaRDD<Row> finalRDD = reduceResultRDD
+                .repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator())
+                .map(record -> {
+                    List<Object> keys = record._1;
+                    Object[] values = record._2;
+                    int size = keys.size() + values.length;
+                    Object[] result = new Object[size];
+
+                    for (int i = 0; i < keys.size(); i++) {
+                        result[i] = keys.get(i);
+                    }
+
+                    for (int i = keys.size(); i < size; i++) {
+                        int valueIdx = i - keys.size();
+                        result[i] = isDuplicateTable ? values[valueIdx] : sparkRDDAggregators[valueIdx].finalize(values[valueIdx]);
+                    }
+
+                    return RowFactory.create(result);
+                });
+
+        // 4 convert to dataframe
+        StructType tableSchemaWithBucketId = DppUtils.createDstTableSchema(currentIndexMeta.columns, true, true);
+        dataframe = spark.createDataFrame(finalRDD, tableSchemaWithBucketId);
+        return dataframe;
+
+    }
+
+    // write data to parquet file by using writing the parquet scheme of spark.
+    private void writePartitionedAndSortedDataframeToParquet(Dataset<Row> dataframe,
+                                                             String pathPattern,
+                                                             long tableId,
+                                                             EtlJobConfig.EtlIndex indexMeta) throws UserException {
+        StructType outputSchema = dataframe.schema();
+        StructType dstSchema = DataTypes.createStructType(
+                Arrays.asList(outputSchema.fields()).stream()
+                        .filter(field -> !field.name().equalsIgnoreCase(DppUtils.BUCKET_ID))
+                        .collect(Collectors.toList()));
+        ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
+        dataframe.foreachPartition(new ForeachPartitionFunction<Row>() {
+            @Override
+            public void call(Iterator<Row> t) throws Exception {
+                // write the data to dst file
+                Configuration conf = new Configuration();
+                FileSystem fs = FileSystem.get(URI.create(etlJobConfig.outputPath), conf);
+                String lastBucketKey = null;
+                ParquetWriter<InternalRow> parquetWriter = null;
+                TaskContext taskContext = TaskContext.get();
+                long taskAttemptId = taskContext.taskAttemptId();
+                String dstPath = "";
+                String tmpPath = "";
+
+                while (t.hasNext()) {
+                    Row row = t.next();
+                    if (row.length() <= 1) {
+                        LOG.warn("invalid row:" + row);
+                        continue;
+                    }
+
+
+                    String curBucketKey = row.getString(0);
+                    List<Object> columnObjects = new ArrayList<>();
+                    for (int i = 1; i < row.length(); ++i) {
+                        Object columnValue = row.get(i);
+                        columnObjects.add(columnValue);
+                    }
+                    Row rowWithoutBucketKey = RowFactory.create(columnObjects.toArray());
+                    // if the bucket key is new, it will belong to a new tablet
+                    if (lastBucketKey == null || !curBucketKey.equals(lastBucketKey)) {
+                        if (parquetWriter != null) {
+                            parquetWriter.close();
+                            // rename tmpPath to path
+                            try {
+                                fs.rename(new Path(tmpPath), new Path(dstPath));
+                            } catch (IOException ioe) {
+                                LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:" + ioe);
+                                throw ioe;
+                            }
+                        }
+                        // flush current writer and create a new writer
+                        String[] bucketKey = curBucketKey.split("_");
+                        if (bucketKey.length != 2) {
+                            LOG.warn("invalid bucket key:" + curBucketKey);
+                            continue;
+                        }
+                        int partitionId = Integer.parseInt(bucketKey[0]);
+                        int bucketId = Integer.parseInt(bucketKey[1]);
+                        dstPath = String.format(pathPattern, tableId, partitionId, indexMeta.indexId,
+                                bucketId, indexMeta.schemaHash);
+                        tmpPath = dstPath + "." + taskAttemptId;
+                        conf.setBoolean("spark.sql.parquet.writeLegacyFormat", false);
+                        conf.setBoolean("spark.sql.parquet.int64AsTimestampMillis", false);
+                        conf.setBoolean("spark.sql.parquet.int96AsTimestamp", true);
+                        conf.setBoolean("spark.sql.parquet.binaryAsString", false);
+                        conf.set("spark.sql.parquet.outputTimestampType", "INT96");
+                        ParquetWriteSupport.setSchema(dstSchema, conf);
+                        ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
+                        parquetWriter = new ParquetWriter<InternalRow>(new Path(tmpPath), parquetWriteSupport,
+                                CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024,
+                                true, false,
+                                ParquetProperties.WriterVersion.PARQUET_1_0, conf);
+                        if (parquetWriter != null) {
+                            LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
+                        }
+                        lastBucketKey = curBucketKey;
+                    }
+                    InternalRow internalRow = encoder.toRow(rowWithoutBucketKey);
+                    parquetWriter.write(internalRow);
+                }
+                if (parquetWriter != null) {
+                    parquetWriter.close();
+                    try {
+                        fs.rename(new Path(tmpPath), new Path(dstPath));
+                    } catch (IOException ioe) {
+                        LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:" + ioe);
+                        throw ioe;
+                    }
+                }
+            }
+        });
+    }
+
+    // TODO(wb) one shuffle to calculate the rollup in the same level
+    private void processRollupTree(RollupTreeNode rootNode,
+                                   Dataset<Row> rootDataframe,
+                                   long tableId, EtlJobConfig.EtlTable tableMeta,
+                                   EtlJobConfig.EtlIndex baseIndex) throws UserException {
+        Queue<RollupTreeNode> nodeQueue = new LinkedList<>();
+        nodeQueue.offer(rootNode);
+        int currentLevel = 0;
+        // level travel the tree
+        Map<Long, Dataset<Row>> parentDataframeMap = new HashMap<>();
+        parentDataframeMap.put(baseIndex.indexId, rootDataframe);
+        Map<Long, Dataset<Row>> childrenDataframeMap = new HashMap<>();
+        String pathPattern = etlJobConfig.outputPath + "/" + etlJobConfig.outputFilePattern;
+        while (!nodeQueue.isEmpty()) {
+            RollupTreeNode curNode = nodeQueue.poll();
+            LOG.info("start to process index:" + curNode.indexId);
+            if (curNode.children != null) {
+                for (RollupTreeNode child : curNode.children) {
+                    nodeQueue.offer(child);
+                }
+            }
+            Dataset<Row> curDataFrame = null;
+            // column select for rollup
+            if (curNode.level != currentLevel) {
+                for (Dataset<Row> dataframe : parentDataframeMap.values()) {
+                    dataframe.unpersist();
+                }
+                currentLevel = curNode.level;
+                parentDataframeMap.clear();
+                parentDataframeMap = childrenDataframeMap;
+                childrenDataframeMap = new HashMap<>();
+            }
+
+            long parentIndexId = baseIndex.indexId;
+            if (curNode.parent != null) {
+                parentIndexId = curNode.parent.indexId;
+            }
+
+            Dataset<Row> parentDataframe = parentDataframeMap.get(parentIndexId);
+            List<Column> columns = new ArrayList<>();
+            List<Column> keyColumns = new ArrayList<>();
+            Column bucketIdColumn = new Column(DppUtils.BUCKET_ID);
+            keyColumns.add(bucketIdColumn);
+            columns.add(bucketIdColumn);
+            for (String keyName : curNode.keyColumnNames) {
+                columns.add(new Column(keyName));
+                keyColumns.add(new Column(keyName));
+            }
+            for (String valueName : curNode.valueColumnNames) {
+                columns.add(new Column(valueName));
+            }
+            Seq<Column> columnSeq = JavaConverters.asScalaIteratorConverter(columns.iterator()).asScala().toSeq();
+            curDataFrame = parentDataframe.select(columnSeq);
+            // aggregate and repartition
+            curDataFrame = processRDDAggAndRepartition(curDataFrame, curNode.indexMeta);
+
+            childrenDataframeMap.put(curNode.indexId, curDataFrame);
+
+            if (curNode.children != null && curNode.children.size() > 1) {
+                // if the children number larger than 1, persist the dataframe for performance
+                curDataFrame.persist();
+            }
+            writePartitionedAndSortedDataframeToParquet(curDataFrame, pathPattern, tableId, curNode.indexMeta);
+        }
+    }
+
+    // repartition dataframe by partitionid_bucketid
+    // so data in the same bucket will be consecutive.
+    private Dataset<Row> repartitionDataframeByBucketId(SparkSession spark, Dataset<Row> dataframe,
+                                                        EtlJobConfig.EtlPartitionInfo partitionInfo,
+                                                        List<Integer> partitionKeyIndex,
+                                                        List<Class> partitionKeySchema,
+                                                        List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys,
+                                                        List<String> keyColumnNames,
+                                                        List<String> valueColumnNames,
+                                                        StructType dstTableSchema,
+                                                        EtlJobConfig.EtlIndex baseIndex,
+                                                        List<Long> validPartitionIds) throws UserException {
+        List<String> distributeColumns = partitionInfo.distributionColumnRefs;
+        Partitioner partitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndex, partitionRangeKeys);
+        Set<Integer> validPartitionIndex = new HashSet<>();
+        if (validPartitionIds == null) {
+            for (int i = 0; i < partitionInfo.partitions.size(); ++i) {
+                validPartitionIndex.add(i);
+            }
+        } else {
+            for (int i = 0; i < partitionInfo.partitions.size(); ++i) {
+                if (validPartitionIds.contains(partitionInfo.partitions.get(i).partitionId)) {
+                    validPartitionIndex.add(i);
+                }
+            }
+        }
+        // use PairFlatMapFunction instead of PairMapFunction because the there will be
+        // 0 or 1 output row for 1 input row
+        JavaPairRDD<String, DppColumns> pairRDD = dataframe.javaRDD().flatMapToPair(
+                new PairFlatMapFunction<Row, String, DppColumns>() {
+                    @Override
+                    public Iterator<Tuple2<String, DppColumns>> call(Row row) {
+                        List<Object> columns = new ArrayList<>();
+                        List<Object> keyColumns = new ArrayList<>();
+                        for (String columnName : keyColumnNames) {
+                            Object columnObject = row.get(row.fieldIndex(columnName));
+                            columns.add(columnObject);
+                            keyColumns.add(columnObject);
+                        }
+
+                        for (String columnName : valueColumnNames) {
+                            columns.add(row.get(row.fieldIndex(columnName)));
+                        }
+                        DppColumns dppColumns = new DppColumns(columns);
+                        DppColumns key = new DppColumns(keyColumns);
+                        List<Tuple2<String, DppColumns>> result = new ArrayList<>();
+                        int pid = partitioner.getPartition(key);
+                        if (!validPartitionIndex.contains(pid)) {
+                            LOG.warn("invalid partition for row:" + row + ", pid:" + pid);
+                            abnormalRowAcc.add(1);
+                            LOG.info("abnormalRowAcc:" + abnormalRowAcc);
+                            if (abnormalRowAcc.value() < 5) {
+                                LOG.info("add row to invalidRows:" + row.toString());
+                                invalidRows.add(row.toString());
+                                LOG.info("invalid rows contents:" + invalidRows.value());
+                            }
+                        } else {
+                            long hashValue = DppUtils.getHashValue(row, distributeColumns, dstTableSchema);
+                            int bucketId = (int) ((hashValue & 0xffffffff) % partitionInfo.partitions.get(pid).bucketNum);
+                            long partitionId = partitionInfo.partitions.get(pid).partitionId;
+                            // bucketKey is partitionId_bucketId
+                            String bucketKey = partitionId + "_" + bucketId;
+                            Tuple2<String, DppColumns> newTuple = new Tuple2<String, DppColumns>(bucketKey, dppColumns);
+                            result.add(newTuple);
+                        }
+                        return result.iterator();
+                    }
+                });
+        // TODO(wb): using rdd instead of dataframe from here
+        JavaRDD<Row> resultRdd = pairRDD.map(record -> {
+                    String bucketKey = record._1;
+                    List<Object> row = new ArrayList<>();
+                    // bucketKey as the first key
+                    row.add(bucketKey);
+                    row.addAll(record._2.columns);
+                    return RowFactory.create(row.toArray());
+                }
+        );
+
+        StructType tableSchemaWithBucketId = DppUtils.createDstTableSchema(baseIndex.columns, true, false);
+        dataframe = spark.createDataFrame(resultRdd, tableSchemaWithBucketId);
+        // use bucket number as the parallel number
+        int reduceNum = 0;
+        for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
+            for (int i = 0; i < partition.bucketNum; i++) {
+                bucketKeyMap.put(partition.partitionId + "_" + i, reduceNum);
+                reduceNum++;
+            }
+        }
+
+        // print to system.out for easy to find log info
+        System.out.println("print bucket key map:" + bucketKeyMap.toString());
+
+        return dataframe;
+    }
+
+    // do the etl process
+    private Dataset<Row> convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex baseIndex,
+                                                           Dataset<Row> srcDataframe,
+                                                           StructType dstTableSchema,
+                                                           EtlJobConfig.EtlFileGroup fileGroup) throws UserException {
+        Dataset<Row> dataframe = srcDataframe;
+        StructType srcSchema = dataframe.schema();
+        Set<String> srcColumnNames = new HashSet<>();
+        for (StructField field : srcSchema.fields()) {
+            srcColumnNames.add(field.name());
+        }
+        Map<String, EtlJobConfig.EtlColumnMapping> columnMappings = fileGroup.columnMappings;
+        // 1. process simple columns
+        Set<String> mappingColumns = null;
+        if (columnMappings != null) {
+            mappingColumns = columnMappings.keySet();
+        }
+        List<String> dstColumnNames = new ArrayList<>();
+        for (StructField dstField : dstTableSchema.fields()) {
+            dstColumnNames.add(dstField.name());
+            EtlJobConfig.EtlColumn column = baseIndex.getColumn(dstField.name());
+            if (!srcColumnNames.contains(dstField.name())) {
+                if (mappingColumns != null && mappingColumns.contains(dstField.name())) {
+                    // mapping columns will be processed in next step
+                    continue;
+                }
+                if (column.defaultValue != null) {
+                    if (column.defaultValue.equals(NULL_FLAG)) {
+                        dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
+                    } else {
+                        dataframe = dataframe.withColumn(dstField.name(), functions.lit(column.defaultValue));
+                    }
+                } else if (column.isAllowNull) {
+                    dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
+                } else {
+                    throw new UserException("Reason: no data for column:" + dstField.name());
+                }
+            }
+            if (column.columnType.equalsIgnoreCase("DATE")) {
+                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date"));
+            } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
+                dataframe = dataframe.withColumn(dstField.name(),
+                        functions.when(dataframe.col(dstField.name()).equalTo("true"), "1")
+                                .otherwise("0"));
+            } else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) && !dstField.dataType().equals(DataTypes.StringType)) {
+                dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(dstField.dataType()));
+            }
+            if (fileGroup.isNegative && !column.isKey) {
+                // negative load
+                // value will be convert te -1 * value
+                dataframe = dataframe.withColumn(dstField.name(), functions.expr("-1 *" + dstField.name()));
+            }
+        }
+        // 2. process the mapping columns
+        for (String mappingColumn : mappingColumns) {
+            String mappingDescription = columnMappings.get(mappingColumn).toDescription();
+            if (mappingDescription.toLowerCase().contains("hll_hash")) {
+                continue;
+            }
+            // here should cast data type to dst column type
+            dataframe = dataframe.withColumn(mappingColumn,
+                    functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
+        }
+        // projection and reorder the columns
+        dataframe.createOrReplaceTempView("src_table");
+        StringBuilder selectSqlBuilder = new StringBuilder();
+        selectSqlBuilder.append("select ");
+        for (String name : dstColumnNames) {
+            selectSqlBuilder.append(name + ",");
+        }
+        selectSqlBuilder.deleteCharAt(selectSqlBuilder.length() - 1);
+        selectSqlBuilder.append(" from src_table");
+        String selectSql = selectSqlBuilder.toString();
+        dataframe = spark.sql(selectSql);
+        return dataframe;
+    }
+
+    private Dataset<Row> loadDataFromPath(SparkSession spark,
+                                          EtlJobConfig.EtlFileGroup fileGroup,
+                                          String fileUrl,
+                                          EtlJobConfig.EtlIndex baseIndex,
+                                          List<EtlJobConfig.EtlColumn> columns) throws UserException {
+        List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
+        List<String> dataSrcColumns = fileGroup.fileFieldNames;
+        if (dataSrcColumns == null) {
+            // if there is no source columns info
+            // use base index columns as source columns
+            dataSrcColumns = new ArrayList<>();
+            for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+                dataSrcColumns.add(column.columnName);
+            }
+        }
+        List<String> dstTableNames = new ArrayList<>();
+        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+            dstTableNames.add(column.columnName);
+        }
+        List<String> srcColumnsWithColumnsFromPath = new ArrayList<>();
+        srcColumnsWithColumnsFromPath.addAll(dataSrcColumns);
+        if (fileGroup.columnsFromPath != null) {
+            srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
+        }
+        StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
+        JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
+        int columnSize = dataSrcColumns.size();
+        List<ColumnParser> parsers = new ArrayList<>();
+        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+            parsers.add(ColumnParser.create(column));
+        }
+        // now we first support csv file
+        // TODO: support parquet file and orc file
+        JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
+                record -> {
+                    scannedRowsAcc.add(1);
+                    String[] attributes = record.split(fileGroup.columnSeparator);
+                    List<Row> result = new ArrayList<>();
+                    boolean validRow = true;
+                    if (attributes.length != columnSize) {
+                        LOG.warn("invalid src schema, data columns:"
+                                + attributes.length + ", file group columns:"
+                                + columnSize + ", row:" + record);
+                        validRow = false;
+                    } else {
+                        for (int i = 0; i < attributes.length; ++i) {
+                            if (attributes[i].equals(NULL_FLAG)) {
+                                if (baseIndex.columns.get(i).isAllowNull) {
+                                    attributes[i] = null;
+                                } else {
+                                    LOG.warn("colunm:" + i + " can not be null. row:" + record);
+                                    validRow = false;
+                                    break;
+                                }
+                            }
+                            boolean isStrictMode = (boolean) etlJobConfig.properties.strictMode;
+                            if (isStrictMode) {
+                                StructField field = srcSchema.apply(i);
+                                if (dstTableNames.contains(field.name())) {
+                                    String type = columns.get(i).columnType;
+                                    if (type.equalsIgnoreCase("CHAR")
+                                            || type.equalsIgnoreCase("VARCHAR")) {
+                                        continue;
+                                    }
+                                    ColumnParser parser = parsers.get(i);
+                                    boolean valid = parser.parse(attributes[i]);
+                                    if (!valid) {
+                                        validRow = false;
+                                        LOG.warn("invalid row:" + record
+                                                + ", attribute " + i + ": " + attributes[i] + " parsed failed");
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                    if (validRow) {
+                        Row row = null;
+                        if (fileGroup.columnsFromPath == null) {
+                            row = RowFactory.create(attributes);
+                        } else {
+                            // process columns from path
+                            // append columns from path to the tail
+                            List<String> columnAttributes = new ArrayList<>();
+                            columnAttributes.addAll(Arrays.asList(attributes));
+                            columnAttributes.addAll(columnValueFromPath);
+                            row = RowFactory.create(columnAttributes.toArray());
+                        }
+                        result.add(row);
+                    } else {
+                        abnormalRowAcc.add(1);
+                        // at most add 5 rows to invalidRows
+                        if (abnormalRowAcc.value() <= 5) {
+                            invalidRows.add(record);
+                        }
+                    }
+                    return result.iterator();
+                }
+        );
+
+        Dataset<Row> dataframe = spark.createDataFrame(rowRDD, srcSchema);
+        return dataframe;
+    }
+
+    private StructType createScrSchema(List<String> srcColumns) {
+        List<StructField> fields = new ArrayList<>();
+        for (String srcColumn : srcColumns) {
+            // user StringType to load source data
+            StructField field = DataTypes.createStructField(srcColumn, DataTypes.StringType, true);
+            fields.add(field);
+        }
+        StructType srcSchema = DataTypes.createStructType(fields);
+        return srcSchema;
+    }
+
+    // partition keys will be parsed into double from json
+    // so need to convert it to partition columns' type
+    private Object convertPartitionKey(Object srcValue, Class dstClass) throws UserException {
+        if (dstClass.equals(Float.class) || dstClass.equals(Double.class)) {
+            return null;
+        }
+        if (srcValue instanceof Double) {
+            if (dstClass.equals(Short.class)) {
+                return ((Double) srcValue).shortValue();
+            } else if (dstClass.equals(Integer.class)) {
+                return ((Double) srcValue).intValue();
+            } else if (dstClass.equals(Long.class)) {
+                return ((Double) srcValue).longValue();
+            } else if (dstClass.equals(BigInteger.class)) {
+                return new BigInteger(((Double) srcValue).toString());
+            } else if (dstClass.equals(java.sql.Date.class) || dstClass.equals(java.util.Date.class)) {
+                double srcValueDouble = (double)srcValue;
+                return convertToJavaDate((int) srcValueDouble);
+            } else if (dstClass.equals(java.sql.Timestamp.class)) {
+                double srcValueDouble = (double)srcValue;
+                return convertToJavaDatetime((long)srcValueDouble);
+            } else {
+                // dst type is string
+                return srcValue.toString();
+            }
+        } else {
+            LOG.warn("unsupport partition key:" + srcValue);
+            throw new UserException("unsupport partition key:" + srcValue);
+        }
+    }
+
+    private java.sql.Timestamp convertToJavaDatetime(long src) {
+        String dateTimeStr = Long.valueOf(src).toString();
+        if (dateTimeStr.length() != 14) {
+            throw new RuntimeException("invalid input date format for SparkDpp");
+        }
+
+        String year = dateTimeStr.substring(0, 4);
+        String month = dateTimeStr.substring(4, 6);
+        String day = dateTimeStr.substring(6, 8);
+        String hour = dateTimeStr.substring(8, 10);
+        String min = dateTimeStr.substring(10, 12);
+        String sec = dateTimeStr.substring(12, 14);
+
+        return java.sql.Timestamp.valueOf(String.format("%s-%s-%s %s:%s:%s", year, month, day, hour, min, sec));
+    }
+
+    private java.sql.Date convertToJavaDate(int originDate) {
+        int day = originDate & 0x1f;
+        originDate >>= 5;
+        int month = originDate & 0x0f;
+        originDate >>= 4;
+        int year = originDate;
+        return java.sql.Date.valueOf(String.format("%04d-%02d-%02d", year, month, day));
+    }
+
+    private List<DorisRangePartitioner.PartitionRangeKey> createPartitionRangeKeys(
+            EtlJobConfig.EtlPartitionInfo partitionInfo, List<Class> partitionKeySchema) throws UserException {
+        List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
+        for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
+            DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey();
+            List<Object> startKeyColumns = new ArrayList<>();
+            for (int i = 0; i < partition.startKeys.size(); i++) {
+                Object value = partition.startKeys.get(i);
+                startKeyColumns.add(convertPartitionKey(value, partitionKeySchema.get(i)));
+            }
+            partitionRangeKey.startKeys = new DppColumns(startKeyColumns);
+            if (!partition.isMaxPartition) {
+                partitionRangeKey.isMaxPartition = false;
+                List<Object> endKeyColumns = new ArrayList<>();
+                for (int i = 0; i < partition.endKeys.size(); i++) {
+                    Object value = partition.endKeys.get(i);
+                    endKeyColumns.add(convertPartitionKey(value, partitionKeySchema.get(i)));
+                }
+                partitionRangeKey.endKeys = new DppColumns(endKeyColumns);
+            } else {
+                partitionRangeKey.isMaxPartition = true;
+            }
+            partitionRangeKeys.add(partitionRangeKey);
+        }
+        return partitionRangeKeys;
+    }
+
+    private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
+                                               EtlJobConfig.EtlIndex baseIndex,
+                                               List<String> filePaths,
+                                               EtlJobConfig.EtlFileGroup fileGroup,
+                                               StructType dstTableSchema)
+            throws UserException, IOException, URISyntaxException {
+        Dataset<Row> fileGroupDataframe = null;
+        for (String filePath : filePaths) {
+            fileNumberAcc.add(1);
+            try {
+                Configuration conf = new Configuration();
+                URI uri = new URI(filePath);
+                FileSystem fs = FileSystem.get(uri, conf);
+                FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
+                fileSizeAcc.add(fileStatus.getLen());
+            } catch (Exception e) {
+                LOG.warn("parse path failed:" + filePath);
+                throw e;
+            }
+            if (fileGroup.columnSeparator == null) {
+                LOG.warn("invalid null column separator!");
+                throw new UserException("Reason: invalid null column separator!");
+            }
+            Dataset<Row> dataframe = null;
+
+            dataframe = loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns);
+            dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
+            if (fileGroupDataframe == null) {
+                fileGroupDataframe = dataframe;
+            } else {
+                fileGroupDataframe.union(dataframe);
+            }
+        }
+        return fileGroupDataframe;
+    }
+
+    private Dataset<Row> loadDataFromHiveTable(SparkSession spark,
+                                               String hiveTableName,
+                                               EtlJobConfig.EtlIndex baseIndex,
+                                               EtlJobConfig.EtlFileGroup fileGroup,
+                                               StructType dstTableSchema) throws UserException {
+        Dataset<Row> dataframe = spark.sql("select * from " + hiveTableName);
+        dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
+        return dataframe;
+    }
+
+    private DppResult process() throws Exception {
+        DppResult dppResult = new DppResult();
+        try {
+            for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
+                Long tableId = entry.getKey();
+                EtlJobConfig.EtlTable etlTable = entry.getValue();
+
+                // get the base index meta
+                EtlJobConfig.EtlIndex baseIndex = null;
+                for (EtlJobConfig.EtlIndex indexMeta : etlTable.indexes) {
+                    if (indexMeta.isBaseIndex) {
+                        baseIndex = indexMeta;
+                        break;
+                    }
+                }
+
+                // get key column names and value column names seperately
+                List<String> keyColumnNames = new ArrayList<>();
+                List<String> valueColumnNames = new ArrayList<>();
+                for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
+                    if (etlColumn.isKey) {
+                        keyColumnNames.add(etlColumn.columnName);
+                    } else {
+                        valueColumnNames.add(etlColumn.columnName);
+                    }
+                }
+
+                EtlJobConfig.EtlPartitionInfo partitionInfo = etlTable.partitionInfo;
+                List<Integer> partitionKeyIndex = new ArrayList<Integer>();
+                List<Class> partitionKeySchema = new ArrayList<>();
+                for (String key : partitionInfo.partitionColumnRefs) {
+                    for (int i = 0; i < baseIndex.columns.size(); ++i) {
+                        EtlJobConfig.EtlColumn column = baseIndex.columns.get(i);
+                        if (column.columnName.equals(key)) {
+                            partitionKeyIndex.add(i);
+                            partitionKeySchema.add(DppUtils.getClassFromColumn(column));
+                            break;
+                        }
+                    }
+                }
+                List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = createPartitionRangeKeys(partitionInfo, partitionKeySchema);
+                StructType dstTableSchema = DppUtils.createDstTableSchema(baseIndex.columns, false, false);
+                RollupTreeBuilder rollupTreeParser = new MinimumCoverageRollupTreeBuilder();
+                RollupTreeNode rootNode = rollupTreeParser.build(etlTable);
+                LOG.info("Start to process rollup tree:" + rootNode);
+
+                Dataset<Row> tableDataframe = null;
+                for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) {
+                    List<String> filePaths = fileGroup.filePaths;
+                    Dataset<Row> fileGroupDataframe = null;
+                    if (Strings.isNullOrEmpty(fileGroup.hiveDbTableName)) {
+                        fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema);
+                    } else {
+                        String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
+                        String dorisIntermediateHiveTable = String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME,
+                                                                          tableId, taskId);
+                        fileGroupDataframe = loadDataFromHiveTable(spark, dorisIntermediateHiveTable, baseIndex, fileGroup, dstTableSchema);
+                    }
+                    if (fileGroupDataframe == null) {
+                        LOG.info("no data for file file group:" + fileGroup);
+                        continue;
+                    }
+                    if (!Strings.isNullOrEmpty(fileGroup.where)) {
+                        long originalSize = fileGroupDataframe.count();
+                        fileGroupDataframe = fileGroupDataframe.filter(fileGroup.where);
+                        long currentSize = fileGroupDataframe.count();
+                        unselectedRowAcc.add(currentSize - originalSize);
+                    }
+
+                    fileGroupDataframe = repartitionDataframeByBucketId(spark, fileGroupDataframe,
+                            partitionInfo, partitionKeyIndex,
+                            partitionKeySchema, partitionRangeKeys,
+                            keyColumnNames, valueColumnNames,
+                            dstTableSchema, baseIndex, fileGroup.partitions);
+                    if (tableDataframe == null) {
+                        tableDataframe = fileGroupDataframe;
+                    } else {
+                        tableDataframe.union(fileGroupDataframe);
+                    }
+                }
+                processRollupTree(rootNode, tableDataframe, tableId, etlTable, baseIndex);
+            }
+            spark.stop();
+        } catch (Exception exception) {
+            LOG.warn("spark dpp failed for exception:" + exception);
+            dppResult.isSuccess = false;
+            dppResult.failedReason = exception.getMessage();
+            dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
+            dppResult.scannedRows = scannedRowsAcc.value();
+            dppResult.fileNumber = fileNumberAcc.value();
+            dppResult.fileSize = fileSizeAcc.value();
+            dppResult.abnormalRows = abnormalRowAcc.value();
+            dppResult.partialAbnormalRows = invalidRows.value();
+            throw exception;
+        }
+        LOG.info("invalid rows contents:" + invalidRows.value());
+        dppResult.isSuccess = true;
+        dppResult.failedReason = "";
+        dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
+        dppResult.scannedRows = scannedRowsAcc.value();
+        dppResult.fileNumber = fileNumberAcc.value();
+        dppResult.fileSize = fileSizeAcc.value();
+        dppResult.abnormalRows = abnormalRowAcc.value();
+        dppResult.partialAbnormalRows = invalidRows.value();
+        return dppResult;
+    }
+
+    public void doDpp() throws Exception {
+        // write dpp result to output
+        DppResult dppResult = process();
+        String outputPath = etlJobConfig.getOutputPath();
+        String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
+        Configuration conf = new Configuration();
+        URI uri = new URI(outputPath);
+        FileSystem fs = FileSystem.get(uri, conf);
+        Path filePath = new Path(resultFilePath);
+        FSDataOutputStream outputStream = fs.create(filePath);
+        Gson gson = new Gson();
+        outputStream.write(gson.toJson(dppResult).getBytes());
+        outputStream.write('\n');
+        outputStream.close();
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
new file mode 100644
index 0000000..adfca9f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.BitmapValue;
+import org.apache.doris.load.loadv2.Hll;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+// contains all class about spark aggregate
+
+public abstract class SparkRDDAggregator<T> implements Serializable {
+
+    T init(Object value) {
+        return (T) value;
+    }
+
+    abstract T update(T v1, T v2);
+
+    Object finalize(Object value) {
+        return value;
+    };
+
+    // TODO(wb) support more datatype:decimal,date,datetime
+    public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws UserException {
+        String aggType = StringUtils.lowerCase(column.aggregationType);
+        String columnType = StringUtils.lowerCase(column.columnType);
+        switch (aggType) {
+            case "bitmap_union" :
+                return new BitmapUnionAggregator();
+            case "hll_union" :
+                return new HllUnionAggregator();
+            case "max":
+                switch (columnType) {
+                    case "tinyint":
+                    case "smallint":
+                    case "int":
+                    case "bigint":
+                    case "float":
+                    case "double":
+                        return new NumberMaxAggregator();
+                    case "char":
+                    case "varchar":
+                        return new StringMaxAggregator();
+                    case "largeint":
+                        return new LargeIntMaxAggregator();
+                    default:
+                        throw new UserException(String.format("unsupported max aggregator for column type:%s", columnType));
+                }
+            case "min":
+                switch (columnType) {
+                    case "tinyint":
+                    case "smallint":
+                    case "int":
+                    case "bigint":
+                    case "float":
+                    case "double":
+                        return new NumberMinAggregator();
+                    case "char":
+                    case "varchar":
+                        return new StringMinAggregator();
+                    case "largeint":
+                        return new LargeIntMinAggregator();
+                    default:
+                        throw new UserException(String.format("unsupported min aggregator for column type:%s", columnType));
+                }
+            case "sum":
+                switch (columnType) {
+                    case "tinyint":
+                        return new ByteSumAggregator();
+                    case "smallint":
+                        return new ShortSumAggregator();
+                    case "int":
+                        return new IntSumAggregator();
+                    case "bigint":
+                        return new LongSumAggregator();
+                    case "float":
+                        return new FloatSumAggregator();
+                    case "double":
+                        return new DoubleSumAggregator();
+                    case "largeint":
+                        return new LargeIntSumAggregator();
+                    default:
+                        throw new UserException(String.format("unsupported sum aggregator for column type:%s", columnType));
+                }
+            case "replace_if_not_null":
+                return new ReplaceIfNotNullAggregator();
+            case "replace":
+                return new ReplaceAggregator();
+            default:
+                throw new UserException(String.format("unsupported aggregate type %s", aggType));
+        }
+    }
+
+}
+
+class EncodeDuplicateTableFunction extends EncodeAggregateTableFunction {
+
+    private int valueLen;
+
+    public EncodeDuplicateTableFunction(int keyLen, int valueLen) {
+        super(keyLen);
+        this.valueLen = valueLen;
+    }
+
+    @Override
+    public Tuple2<List<Object>, Object[]> call(Row row) throws Exception {
+        List<Object> keys = new ArrayList(keyLen);
+        Object[] values = new Object[valueLen];
+
+        for (int i = 0; i < keyLen; i++) {
+            keys.add(row.get(i));
+        }
+
+        for (int i = keyLen; i < row.length(); i++) {
+            values[i - keyLen] = row.get(i);
+        }
+
+        return new Tuple2<>(keys, values);
+    }
+}
+
+class EncodeAggregateTableFunction implements PairFunction<Row, List<Object>, Object[]> {
+
+    private SparkRDDAggregator[] valueAggregators;
+    // include bucket id
+    protected int keyLen;
+
+    public EncodeAggregateTableFunction(int keyLen) {
+        this.keyLen = keyLen;
+    }
+
+    public EncodeAggregateTableFunction(SparkRDDAggregator[] valueAggregators, int keyLen) {
+        this.valueAggregators = valueAggregators;
+        this.keyLen = keyLen;
+    }
+
+    // TODO(wb): use a custom class as key to instead of List to save space
+    @Override
+    public Tuple2<List<Object>, Object[]> call(Row row) throws Exception {
+        List<Object> keys = new ArrayList(keyLen);
+        Object[] values = new Object[valueAggregators.length];
+
+        for (int i = 0; i < keyLen; i++) {
+            keys.add(row.get(i));
+        }
+
+        for (int i = keyLen; i < row.size(); i++) {
+            int valueIdx = i - keyLen;
+            values[valueIdx] = valueAggregators[valueIdx].init(row.get(i));
+        }
+        return new Tuple2<>(keys, values);
+    }
+}
+
+class AggregateReduceFunction implements Function2<Object[], Object[], Object[]> {
+
+    private SparkRDDAggregator[] valueAggregators;
+
+    public AggregateReduceFunction(SparkRDDAggregator[] sparkDppAggregators) {
+        this.valueAggregators = sparkDppAggregators;
+    }
+
+    @Override
+    public Object[] call(Object[] v1, Object[] v2) throws Exception {
+        Object[] result = new Object[valueAggregators.length];
+        for (int i = 0; i < v1.length; i++) {
+            result[i] = valueAggregators[i].update(v1[i], v2[i]);
+        }
+        return result;
+    }
+}
+
+class ReplaceAggregator extends SparkRDDAggregator<Object> {
+
+    @Override
+    Object update(Object dst, Object src) {
+        return src;
+    }
+}
+
+class ReplaceIfNotNullAggregator extends SparkRDDAggregator<Object> {
+
+    @Override
+    Object update(Object dst, Object src) {
+        return src == null ? dst : src;
+    }
+}
+
+class BitmapUnionAggregator extends SparkRDDAggregator<BitmapValue> {
+
+    @Override
+    BitmapValue init(Object value) {
+        try {
+            BitmapValue bitmapValue = new BitmapValue();
+            if (value instanceof byte[]) {
+                bitmapValue.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) value)));
+            } else {
+                bitmapValue.add(value == null ? 0l : Long.valueOf(value.toString()));
+            }
+            return bitmapValue;
+        } catch (Exception e) {
+            throw new RuntimeException("build bitmap value failed", e);
+        }
+    }
+
+    @Override
+    BitmapValue update(BitmapValue v1, BitmapValue v2) {
+        BitmapValue newBitmapValue = new BitmapValue();
+        if (v1 != null) {
+            newBitmapValue.or(v1);
+        }
+        if (v2 != null) {
+            newBitmapValue.or(v2);
+        }
+        return newBitmapValue;
+    }
+
+    @Override
+    byte[] finalize(Object value) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream outputStream = new DataOutputStream(bos);
+            ((BitmapValue)value).serialize(outputStream);
+            return bos.toByteArray();
+        } catch (IOException ioException) {
+            ioException.printStackTrace();
+            throw new RuntimeException(ioException);
+        }
+    }
+
+}
+
+class HllUnionAggregator extends SparkRDDAggregator<Hll> {
+
+    @Override
+    Hll init(Object value) {
+        try {
+            Hll hll = new Hll();
+            if (value instanceof byte[]) {
+                hll.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) value)));
+            } else {
+                hll.updateWithHash(value == null ? 0 : value);
+            }
+            return hll;
+        } catch (Exception e) {
+            throw new RuntimeException("build hll value failed", e);
+        }
+    }
+
+    @Override
+    Hll update(Hll v1, Hll v2) {
+        Hll newHll = new Hll();
+        if (v1 != null) {
+            newHll.merge(v1);
+        }
+        if (v2 != null) {
+            newHll.merge(v2);
+        }
+        return newHll;
+    }
+
+    @Override
+    byte[] finalize(Object value) {
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream outputStream = new DataOutputStream(bos);
+            ((Hll)value).serialize(outputStream);
+            return bos.toByteArray();
+        } catch (IOException ioException) {
+            ioException.printStackTrace();
+            throw new RuntimeException(ioException);
+        }
+    }
+
+}
+
+class LargeIntMaxAggregator extends SparkRDDAggregator<BigInteger> {
+
+    BigInteger init(Object value) {
+        if (value == null) {
+            return null;
+        }
+        return new BigInteger(value.toString());
+    }
+
+    @Override
+    BigInteger update(BigInteger dst, BigInteger src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst.compareTo(src) > 0 ? dst : src;
+    }
+}
+
+class LargeIntMinAggregator extends LargeIntMaxAggregator {
+
+    @Override
+    BigInteger update(BigInteger dst, BigInteger src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst.compareTo(src) < 0 ? dst : src;
+    }
+}
+
+class LargeIntSumAggregator extends LargeIntMaxAggregator {
+
+    @Override
+    BigInteger update(BigInteger dst, BigInteger src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst.add(src);
+    }
+}
+
+
+class NumberMaxAggregator extends SparkRDDAggregator {
+
+    @Override
+    Object update(Object dst, Object src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return ((Comparable)dst).compareTo(src) > 0 ? dst : src;
+    }
+}
+
+
+class NumberMinAggregator extends SparkRDDAggregator {
+
+    @Override
+    Object update(Object dst, Object src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return ((Comparable)dst).compareTo(src) < 0 ? dst : src;
+    }
+}
+
+class LongSumAggregator extends SparkRDDAggregator<Long> {
+
+    @Override
+    Long update(Long dst, Long src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        // TODO(wb) check overflow of long type
+        return dst + src;
+    }
+}
+
+class ShortSumAggregator extends SparkRDDAggregator<Short> {
+
+    @Override
+    Short update(Short dst, Short src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        Integer ret = dst + src;
+        if  (ret > Short.MAX_VALUE || ret < Short.MIN_VALUE) {
+            throw new RuntimeException("short column sum size exceeds Short.MAX_VALUE or Short.MIN_VALUE");
+        }
+        return Short.valueOf(ret.toString());
+    }
+}
+
+class IntSumAggregator extends SparkRDDAggregator<Integer> {
+
+    @Override
+    Integer update(Integer dst, Integer src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        long ret = Long.sum(dst, src);
+        if  (ret > Integer.MAX_VALUE || ret < Integer.MIN_VALUE) {
+            throw new RuntimeException("int column sum size exceeds Integer.MAX_VALUE or Integer.MIN_VALUE");
+        }
+        return (int) ret;
+    }
+}
+
+class ByteSumAggregator extends SparkRDDAggregator<Byte> {
+
+    @Override
+    Byte update(Byte dst, Byte src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        Integer ret = dst + src;
+        if  (ret > Byte.MAX_VALUE || ret < Byte.MIN_VALUE) {
+            throw new RuntimeException("byte column sum size exceeds Byte.MAX_VALUE or Byte.MIN_VALUE");
+        }
+        return Byte.valueOf(ret.toString());
+    }
+}
+
+class DoubleSumAggregator extends SparkRDDAggregator<Double> {
+
+    @Override
+    strictfp Double update(Double dst, Double src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst + src;
+    }
+}
+
+// TODO(wb) add bound check for float/double
+class FloatSumAggregator extends SparkRDDAggregator<Float> {
+
+    @Override
+    strictfp Float update(Float dst, Float src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst + src;
+    }
+}
+
+class StringMaxAggregator extends SparkRDDAggregator<String> {
+
+    @Override
+    String update(String dst, String src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst.compareTo(src) > 0 ? dst : src;
+    }
+}
+
+class StringMinAggregator extends SparkRDDAggregator<String> {
+
+    @Override
+    String update(String dst, String src) {
+        if (src == null) {
+            return dst;
+        }
+        if (dst == null) {
+            return src;
+        }
+        return dst.compareTo(src) < 0 ? dst : src;
+    }
+}
+
+
+class BucketComparator implements Comparator<List<Object>>, Serializable {
+
+    @Override
+    public int compare(List<Object> keyArray1, List<Object> keyArray2) {
+        int cmp = 0;
+
+        for (int i = 0; i < keyArray1.size(); i++) {
+            Object key1 = keyArray1.get(i);
+            Object key2 = keyArray2.get(i);
+            if (key1 == key2) {
+                continue;
+            }
+            if (key1 == null || key2 == null) {
+                return key1 == null ? -1 : 1;
+            }
+            if (key1 instanceof Comparable && key2 instanceof Comparable) {
+                cmp = ((Comparable) key1).compareTo(key2);
+            } else {
+                throw new RuntimeException(String.format("uncomparable column type %s", key1.getClass().toString()));
+            }
+            if (cmp != 0) {
+                return cmp;
+            }
+        }
+
+        return cmp;
+    }
+}
+
+class BucketPartitioner extends Partitioner {
+
+    private Map<String, Integer> bucketKeyMap;
+
+    public BucketPartitioner(Map<String, Integer> bucketKeyMap) {
+        this.bucketKeyMap = bucketKeyMap;
+    }
+
+    @Override
+    public int numPartitions() {
+        return bucketKeyMap.size();
+    }
+
+    @Override
+    public int getPartition(Object key) {
+        List<Object> rddKey = (List<Object>) key;
+        return bucketKeyMap.get(String.valueOf(rddKey.get(0)));
+    }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java
new file mode 100644
index 0000000..02099de
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.spark.util.AccumulatorV2;
+import java.util.List;
+import java.util.ArrayList;
+
+// This class is a accumulator of string based on AccumulatorV2
+// (https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html).
+// Spark does not provide string accumulator.
+//
+// This class is used to collect the invalid rows when doing etl.
+public class StringAccumulator extends AccumulatorV2<String, String> {
+    private List<String> strs = new ArrayList<>();
+
+    @Override
+    public boolean isZero() {
+        return strs.isEmpty();
+    }
+
+    @Override
+    public AccumulatorV2<String, String> copy() {
+        StringAccumulator newAccumulator = new StringAccumulator();
+        newAccumulator.strs.addAll(this.strs);
+        return newAccumulator;
+    }
+
+    @Override
+    public void reset() {
+        strs.clear();
+    }
+
+    @Override
+    public void add(String v) {
+        strs.add(v);
+    }
+
+    @Override
+    public void merge(AccumulatorV2<String, String> other) {
+        StringAccumulator o = (StringAccumulator)other;
+        strs.addAll(o.strs);
+    }
+
+    @Override
+    public String value() {
+        return strs.toString();
+    }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java
new file mode 100644
index 0000000..37addb7
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.dpp.DorisRangePartitioner;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DorisRangePartitionerTest {
+
+    @Test
+    public void testRangePartitioner() {
+        List<Object> startKeys = new ArrayList<>();
+        startKeys.add(new Integer(0));
+        List<Object> endKeys = new ArrayList<>();
+        endKeys.add(new Integer(100));
+        EtlJobConfig.EtlPartition partition1 = new EtlJobConfig.EtlPartition(
+                10000, startKeys, endKeys, false, 3);
+
+        List<Object> startKeys2 = new ArrayList<>();
+        startKeys2.add(new Integer(100));
+        List<Object> endKeys2 = new ArrayList<>();
+        endKeys2.add(new Integer(200));
+        EtlJobConfig.EtlPartition partition2 = new EtlJobConfig.EtlPartition(
+                10001, startKeys2, endKeys2, false, 4);
+
+        List<Object> startKeys3 = new ArrayList<>();
+        startKeys3.add(new Integer(200));
+        List<Object> endKeys3 = new ArrayList<>();
+        endKeys3.add(new Integer(300));
+        EtlJobConfig.EtlPartition partition3 = new EtlJobConfig.EtlPartition(
+                10002, startKeys3, endKeys3, false, 5);
+
+        List<EtlJobConfig.EtlPartition> partitions = new ArrayList<>();
+        partitions.add(partition1);
+        partitions.add(partition2);
+        partitions.add(partition3);
+
+        List<String> partitionColumns = new ArrayList<>();
+        partitionColumns.add("id");
+        List<String> bucketColumns = new ArrayList<>();
+        bucketColumns.add("key");
+        EtlJobConfig.EtlPartitionInfo partitionInfo = new EtlJobConfig.EtlPartitionInfo(
+                "RANGE", partitionColumns, bucketColumns, partitions);
+        List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
+        for (EtlJobConfig.EtlPartition partition : partitions) {
+            DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey();
+            partitionRangeKey.isMaxPartition = false;
+            partitionRangeKey.startKeys = new DppColumns(partition.startKeys);
+            partitionRangeKey.endKeys = new DppColumns(partition.endKeys);
+            partitionRangeKeys.add(partitionRangeKey);
+        }
+        List<Integer> partitionKeyIndexes = new ArrayList<>();
+        partitionKeyIndexes.add(0);
+        DorisRangePartitioner rangePartitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndexes, partitionRangeKeys);
+        int num = rangePartitioner.numPartitions();
+        Assert.assertEquals(3, num);
+
+        List<Object> fields1 = new ArrayList<>();
+        fields1.add(-100);
+        fields1.add("name");
+        DppColumns record1 = new DppColumns(fields1);
+        int id1 = rangePartitioner.getPartition(record1);
+        Assert.assertEquals(-1, id1);
+
+        List<Object> fields2 = new ArrayList<>();
+        fields2.add(10);
+        fields2.add("name");
+        DppColumns record2 = new DppColumns(fields2);
+        int id2 = rangePartitioner.getPartition(record2);
+        Assert.assertEquals(0, id2);
+
+        List<Object> fields3 = new ArrayList<>();
+        fields3.add(110);
+        fields3.add("name");
+        DppColumns record3 = new DppColumns(fields3);
+        int id3 = rangePartitioner.getPartition(record3);
+        Assert.assertEquals(1, id3);
+
+        List<Object> fields4 = new ArrayList<>();
+        fields4.add(210);
+        fields4.add("name");
+        DppColumns record4 = new DppColumns(fields4);
+        int id4 = rangePartitioner.getPartition(record4);
+        Assert.assertEquals(2, id4);
+
+        List<Object> fields5 = new ArrayList<>();
+        fields5.add(310);
+        fields5.add("name");
+        DppColumns record5 = new DppColumns(fields5);
+        int id5 = rangePartitioner.getPartition(record5);
+        Assert.assertEquals(-1, id5);
+    }
+
+    @Test
+    public void testUnpartitionedPartitioner() {
+        List<String> partitionColumns = new ArrayList<>();
+        List<String> bucketColumns = new ArrayList<>();
+        bucketColumns.add("key");
+        EtlJobConfig.EtlPartitionInfo partitionInfo = new EtlJobConfig.EtlPartitionInfo(
+                "UNPARTITIONED", null, bucketColumns, null);
+        List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
+        List<Class> partitionSchema = new ArrayList<>();
+        partitionSchema.add(Integer.class);
+        List<Integer> partitionKeyIndexes = new ArrayList<>();
+        partitionKeyIndexes.add(0);
+        DorisRangePartitioner rangePartitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndexes, null);
+        int num = rangePartitioner.numPartitions();
+        Assert.assertEquals(1, num);
+
+        List<Object> fields = new ArrayList<>();
+        fields.add(100);
+        fields.add("name");
+        DppColumns record = new DppColumns(fields);
+        int id = rangePartitioner.getPartition(record);
+        Assert.assertEquals(0, id);
+    }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java
new file mode 100644
index 0000000..4db033f
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package org.apache.doris.load.loadv2.dpp;
+
+import io.netty.handler.codec.dns.DefaultDnsPtrRecord;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+import java.lang.Exception;
+
+public class DppUtilsTest {
+
+    @Test
+    public void testGetClassFromDataType() {
+        DppUtils dppUtils = new DppUtils();
+
+        Class stringResult = dppUtils.getClassFromDataType(DataTypes.StringType);
+        Assert.assertEquals(String.class, stringResult);
+
+        Class booleanResult = dppUtils.getClassFromDataType(DataTypes.BooleanType);
+        Assert.assertEquals(Boolean.class, booleanResult);
+
+        Class shortResult = dppUtils.getClassFromDataType(DataTypes.ShortType);
+        Assert.assertEquals(Short.class, shortResult);
+
+        Class integerResult = dppUtils.getClassFromDataType(DataTypes.IntegerType);
+        Assert.assertEquals(Integer.class, integerResult);
+
+        Class longResult = dppUtils.getClassFromDataType(DataTypes.LongType);
+        Assert.assertEquals(Long.class, longResult);
+
+        Class floatResult = dppUtils.getClassFromDataType(DataTypes.FloatType);
+        Assert.assertEquals(Float.class, floatResult);
+
+        Class doubleResult = dppUtils.getClassFromDataType(DataTypes.DoubleType);
+        Assert.assertEquals(Double.class, doubleResult);
+
+        Class dateResult = dppUtils.getClassFromDataType(DataTypes.DateType);
+        Assert.assertEquals(Date.class, dateResult);
+    }
+
+    @Test
+    public void testGetClassFromColumn() {
+        DppUtils dppUtils = new DppUtils();
+
+        try {
+            EtlJobConfig.EtlColumn column = new EtlJobConfig.EtlColumn();
+            column.columnType = "CHAR";
+            Class charResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(String.class, charResult);
+
+            column.columnType = "HLL";
+            Class hllResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(String.class, hllResult);
+
+            column.columnType = "OBJECT";
+            Class objectResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(String.class, objectResult);
+
+            column.columnType = "BOOLEAN";
+            Class booleanResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Boolean.class, booleanResult);
+
+            column.columnType = "TINYINT";
+            Class tinyResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Short.class, tinyResult);
+
+            column.columnType = "SMALLINT";
+            Class smallResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Short.class, smallResult);
+
+            column.columnType = "INT";
+            Class integerResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Integer.class, integerResult);
+
+            column.columnType = "DATETIME";
+            Class datetimeResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(java.sql.Timestamp.class, datetimeResult);
+
+            column.columnType = "FLOAT";
+            Class floatResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Float.class, floatResult);
+
+            column.columnType = "DOUBLE";
+            Class doubleResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Double.class, doubleResult);
+
+            column.columnType = "DATE";
+            Class dateResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(Date.class, dateResult);
+
+            column.columnType = "DECIMALV2";
+            column.precision = 10;
+            column.scale = 2;
+            Class decimalResult = dppUtils.getClassFromColumn(column);
+            Assert.assertEquals(BigDecimal.valueOf(10, 2).getClass(), decimalResult);
+        } catch (Exception e) {
+            Assert.assertFalse(false);
+        }
+
+    }
+
+    @Test
+    public void testGetDataTypeFromColumn() {
+        DppUtils dppUtils = new DppUtils();
+
+        try {
+            EtlJobConfig.EtlColumn column = new EtlJobConfig.EtlColumn();
+            column.columnType = "VARCHAR";
+            DataType stringResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.StringType, stringResult);
+
+            column.columnType = "CHAR";
+            DataType charResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.StringType, charResult);
+
+            column.columnType = "HLL";
+            DataType hllResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.StringType, hllResult);
+
+            column.columnType = "OBJECT";
+            DataType objectResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.StringType, objectResult);
+
+            column.columnType = "BOOLEAN";
+            DataType booleanResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.StringType, booleanResult);
+
+            column.columnType = "TINYINT";
+            DataType tinyResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.ByteType, tinyResult);
+
+            column.columnType = "SMALLINT";
+            DataType smallResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.ShortType, smallResult);
+
+            column.columnType = "INT";
+            DataType integerResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.IntegerType, integerResult);
+
+            column.columnType = "BIGINT";
+            DataType longResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.LongType, longResult);
+
+            column.columnType = "DATETIME";
+            DataType datetimeResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.TimestampType, datetimeResult);
+
+            column.columnType = "FLOAT";
+            DataType floatResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.FloatType, floatResult);
+
+            column.columnType = "DOUBLE";
+            DataType doubleResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.DoubleType, doubleResult);
+
+            column.columnType = "DATE";
+            DataType dateResult = dppUtils.getDataTypeFromColumn(column, false);
+            Assert.assertEquals(DataTypes.DateType, dateResult);
+        } catch (Exception e) {
+            Assert.assertTrue(false);
+    }
+    }
+
+    @Test
+    public void testCreateDstTableSchema() {
+        DppUtils dppUtils = new DppUtils();
+
+        EtlJobConfig.EtlColumn column1 = new EtlJobConfig.EtlColumn(
+                "column1", "INT",
+                true, true,
+                "NONE", "0",
+                0, 0, 0);
+        EtlJobConfig.EtlColumn column2 = new EtlJobConfig.EtlColumn(
+                "column2", "SMALLINT",
+                true, true,
+                "NONE", "0",
+                0, 0, 0);
+        List<EtlJobConfig.EtlColumn> columns = new ArrayList<>();
+        columns.add(column1);
+        columns.add(column2);
+
+        try {
+            StructType schema = dppUtils.createDstTableSchema(columns, false, false);
+            Assert.assertEquals(2, schema.fieldNames().length);
+            Assert.assertEquals("column1", schema.fieldNames()[0]);
+            Assert.assertEquals("column2", schema.fieldNames()[1]);
+
+            StructType schema2 = dppUtils.createDstTableSchema(columns, true, false);
+            Assert.assertEquals(3, schema2.fieldNames().length);
+            Assert.assertEquals("__bucketId__", schema2.fieldNames()[0]);
+            Assert.assertEquals("column1", schema2.fieldNames()[1]);
+            Assert.assertEquals("column2", schema2.fieldNames()[2]);
+        } catch (Exception e) {
+            Assert.assertTrue(false);
+        }
+    }
+
+    @Test
+    public void testParseColumnsFromPath() {
+        DppUtils dppUtils = new DppUtils();
+
+        String path = "/path/to/file/city=beijing/date=2020-04-10/data";
+        List<String> columnFromPaths = new ArrayList<>();
+        columnFromPaths.add("city");
+        columnFromPaths.add("date");
+        try {
+            List<String> columnFromPathValues = dppUtils.parseColumnsFromPath(path, columnFromPaths);
+            Assert.assertEquals(2, columnFromPathValues.size());
+            Assert.assertEquals("beijing", columnFromPathValues.get(0));
+            Assert.assertEquals("2020-04-10", columnFromPathValues.get(1));
+        } catch (Exception e) {
+            Assert.assertTrue(false);
+        }
+    }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java
new file mode 100644
index 0000000..6e0e93e
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.load.loadv2.dpp.MinimumCoverageRollupTreeBuilder;
+import org.apache.doris.load.loadv2.dpp.RollupTreeNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MinimumCoverageRollupTreeBuilderTest {
+
+    @Test
+    public void testBuild() {
+        EtlJobConfig.EtlColumn column1 = new EtlJobConfig.EtlColumn(
+                "column1", "INT",
+                true, true,
+                "NONE", "0",
+                0, 0, 0);
+        EtlJobConfig.EtlColumn column2 = new EtlJobConfig.EtlColumn(
+                "column2", "SMALLINT",
+                true, true,
+                "NONE", "0",
+                0, 0, 0);
+        EtlJobConfig.EtlColumn column3 = new EtlJobConfig.EtlColumn(
+                "column3", "VARCHAR",
+                true, true,
+                "NONE", "",
+                0, 0, 0);
+        EtlJobConfig.EtlColumn column4 = new EtlJobConfig.EtlColumn(
+                "column4", "INT",
+                true, false,
+                "SUM", "",
+                0, 0, 0);
+        List<EtlJobConfig.EtlColumn> baseColumns = new ArrayList<>();
+        baseColumns.add(column1);
+        baseColumns.add(column2);
+        baseColumns.add(column3);
+        baseColumns.add(column4);
+        EtlJobConfig.EtlIndex baseIndex = new EtlJobConfig.EtlIndex(10000,
+                baseColumns, 12345, "DUPLICATE", true);
+        List<EtlJobConfig.EtlColumn> roll1Columns = new ArrayList<>();
+        roll1Columns.add(column1);
+        roll1Columns.add(column2);
+        roll1Columns.add(column4);
+        EtlJobConfig.EtlIndex roll1Index = new EtlJobConfig.EtlIndex(10001,
+                roll1Columns, 12346, "AGGREGATE", false);
+        List<EtlJobConfig.EtlColumn> roll2Columns = new ArrayList<>();
+        roll2Columns.add(column1);
+        roll2Columns.add(column4);
+        EtlJobConfig.EtlIndex roll2Index = new EtlJobConfig.EtlIndex(10002,
+                roll2Columns, 12347, "AGGREGATE", false);
+
+        List<EtlJobConfig.EtlColumn> roll3Columns = new ArrayList<>();
+        roll3Columns.add(column3);
+        roll3Columns.add(column4);
+        EtlJobConfig.EtlIndex roll3Index = new EtlJobConfig.EtlIndex(10003,
+                roll3Columns, 12348, "AGGREGATE", false);
+
+        List<EtlJobConfig.EtlIndex> indexes = new ArrayList<>();
+        indexes.add(baseIndex);
+        indexes.add(roll1Index);
+        indexes.add(roll2Index);
+        indexes.add(roll3Index);
+        EtlJobConfig.EtlTable table = new EtlJobConfig.EtlTable(indexes, null);
+
+        MinimumCoverageRollupTreeBuilder builder = new MinimumCoverageRollupTreeBuilder();
+        RollupTreeNode resultNode = builder.build(table);
+        Assert.assertEquals(resultNode.parent, null);
+        Assert.assertEquals(resultNode.indexId, 10000);
+        Assert.assertEquals(resultNode.level, 0);
+        Assert.assertEquals(resultNode.children.size(), 2);
+
+        RollupTreeNode index1Node = resultNode.children.get(0);
+        Assert.assertEquals(index1Node.parent.indexId, 10000);
+        Assert.assertEquals(index1Node.indexId, 10001);
+        Assert.assertEquals(index1Node.level, 1);
+        Assert.assertEquals(index1Node.children.size(), 1);
+
+        RollupTreeNode index3Node = resultNode.children.get(1);
+        Assert.assertEquals(index3Node.parent.indexId, 10000);
+        Assert.assertEquals(index3Node.indexId, 10003);
+        Assert.assertEquals(index3Node.level, 1);
+        Assert.assertEquals(index3Node.children, null);
+
+        RollupTreeNode index2Node = index1Node.children.get(0);
+        Assert.assertEquals(index2Node.parent.indexId, 10001);
+        Assert.assertEquals(index2Node.indexId, 10002);
+        Assert.assertEquals(index2Node.level, 2);
+        Assert.assertEquals(index2Node.children, null);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org