You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/06/22 11:58:46 UTC
[incubator-doris] branch master updated: [Spark Load]Using SparkDpp
to complete some calculation in Spark Load (#3729)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8092aad [Spark Load]Using SparkDpp to complete some calculation in Spark Load (#3729)
8092aad is described below
commit 8092aadc8313a3a5a3e3596221d1ea3418492657
Author: wangbo <50...@qq.com>
AuthorDate: Mon Jun 22 19:58:34 2020 +0800
[Spark Load]Using SparkDpp to complete some calculation in Spark Load (#3729)
---
.../apache/doris/load/loadv2/dpp/ColumnParser.java | 188 +++++
.../load/loadv2/dpp/DorisKryoRegistrator.java | 29 +
.../load/loadv2/dpp/DorisRangePartitioner.java | 87 ++
.../apache/doris/load/loadv2/dpp/DppColumns.java | 112 +++
.../apache/doris/load/loadv2/dpp/DppResult.java | 64 ++
.../org/apache/doris/load/loadv2/dpp/DppUtils.java | 257 ++++++
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 880 +++++++++++++++++++++
.../doris/load/loadv2/dpp/SparkRDDAggregator.java | 564 +++++++++++++
.../doris/load/loadv2/dpp/StringAccumulator.java | 64 ++
.../load/loadv2/dpp/DorisRangePartitionerTest.java | 144 ++++
.../apache/doris/load/loadv2/dpp/DppUtilsTest.java | 244 ++++++
.../dpp/MinimumCoverageRollupTreeBuilderTest.java | 111 +++
12 files changed, 2744 insertions(+)
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
new file mode 100644
index 0000000..c31ed1b
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.joda.time.DateTime;
+
+import java.io.Serializable;
+import java.util.Date;
+
+// Parser to validate value for different type
+public abstract class ColumnParser implements Serializable {
+ public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws UserException {
+ String columnType = etlColumn.columnType;
+ if (columnType.equalsIgnoreCase("TINYINT")) {
+ return new TinyIntParser();
+ } else if (columnType.equalsIgnoreCase("SMALLINT")) {
+ return new SmallIntParser();
+ } else if (columnType.equalsIgnoreCase("INT")) {
+ return new IntParser();
+ } else if (columnType.equalsIgnoreCase("BIGINT")) {
+ return new BigIntParser();
+ } else if (columnType.equalsIgnoreCase("FLOAT")) {
+ return new FloatParser();
+ } else if (columnType.equalsIgnoreCase("DOUBLE")) {
+ return new DoubleParser();
+ } else if (columnType.equalsIgnoreCase("BOOLEAN")) {
+ return new BooleanParser();
+ } else if (columnType.equalsIgnoreCase("DATE")) {
+ return new DateParser();
+ } else if (columnType.equalsIgnoreCase("DATETIME")) {
+ return new DatetimeParser();
+ } else if (columnType.equalsIgnoreCase("VARCHAR")
+ || columnType.equalsIgnoreCase("CHAR")
+ || columnType.equalsIgnoreCase("BITMAP")
+ || columnType.equalsIgnoreCase("HLL")) {
+ return new StringParser(etlColumn);
+ } else {
+ throw new UserException("unsupported type:" + columnType);
+ }
+ }
+
+ public abstract boolean parse(String value);
+}
+
+class TinyIntParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Short parsed = Short.parseShort(value);
+ if (parsed > 127 || parsed < -128) {
+ return false;
+ }
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class SmallIntParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Short.parseShort(value);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class IntParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class BigIntParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class FloatParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Float.parseFloat(value);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class DoubleParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Double.parseDouble(value);
+ } catch (NumberFormatException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class BooleanParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ if (value.equalsIgnoreCase("true")
+ || value.equalsIgnoreCase("false")) {
+ return true;
+ }
+ return false;
+ }
+}
+
+class DateParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ Date.parse(value);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class DatetimeParser extends ColumnParser {
+ @Override
+ public boolean parse(String value) {
+ try {
+ DateTime.parse(value);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+}
+
+class StringParser extends ColumnParser {
+
+ private EtlJobConfig.EtlColumn etlColumn;
+
+ public StringParser(EtlJobConfig.EtlColumn etlColumn) {
+ this.etlColumn = etlColumn;
+ }
+
+ @Override
+ public boolean parse(String value) {
+ try {
+ return value.getBytes("UTF-8").length <= etlColumn.stringLength;
+ } catch (Exception e) {
+ throw new RuntimeException("string check failed ", e);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
new file mode 100644
index 0000000..d2568bb
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisKryoRegistrator.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.spark.serializer.KryoRegistrator;
+
+public class DorisKryoRegistrator implements KryoRegistrator {
+
+ @Override
+ public void registerClasses(Kryo kryo) {
+ kryo.register(org.apache.doris.load.loadv2.Roaring64Map.class);
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java
new file mode 100644
index 0000000..c92529f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitioner.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.spark.Partitioner;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class DorisRangePartitioner extends Partitioner {
+ private static final String UNPARTITIONED_TYPE = "UNPARTITIONED";
+ private EtlJobConfig.EtlPartitionInfo partitionInfo;
+ private List<PartitionRangeKey> partitionRangeKeys;
+ List<Integer> partitionKeyIndexes;
+ public DorisRangePartitioner(EtlJobConfig.EtlPartitionInfo partitionInfo,
+ List<Integer> partitionKeyIndexes,
+ List<PartitionRangeKey> partitionRangeKeys) {
+ this.partitionInfo = partitionInfo;
+ this.partitionKeyIndexes = partitionKeyIndexes;
+ this.partitionRangeKeys = partitionRangeKeys;
+ }
+
+ public int numPartitions() {
+ if (partitionInfo == null) {
+ return 0;
+ }
+ if (partitionInfo.partitionType.equalsIgnoreCase(UNPARTITIONED_TYPE)) {
+ return 1;
+ }
+ return partitionInfo.partitions.size();
+ }
+
+ public int getPartition(Object var1) {
+ if (partitionInfo.partitionType != null
+ && partitionInfo.partitionType.equalsIgnoreCase(UNPARTITIONED_TYPE)) {
+ return 0;
+ }
+ DppColumns key = (DppColumns)var1;
+ // get the partition columns from key as partition key
+ DppColumns partitionKey = new DppColumns(key, partitionKeyIndexes);
+ // TODO: optimize this by use binary search
+ for (int i = 0; i < partitionRangeKeys.size(); ++i) {
+ if (partitionRangeKeys.get(i).isRowContained(partitionKey)) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ public static class PartitionRangeKey implements Serializable {
+ public boolean isMaxPartition;
+ public DppColumns startKeys;
+ public DppColumns endKeys;
+
+ public boolean isRowContained(DppColumns row) {
+ if (isMaxPartition) {
+ return startKeys.compareTo(row) <= 0;
+ } else {
+ return startKeys.compareTo(row) <= 0 && endKeys.compareTo(row) > 0;
+ }
+ }
+
+ public String toString() {
+ return "PartitionRangeKey{" +
+ "isMaxPartition=" + isMaxPartition +
+ ", startKeys=" + startKeys +
+ ", endKeys=" + endKeys +
+ '}';
+ }
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java
new file mode 100644
index 0000000..b5efb2c
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppColumns.java
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.common.base.Preconditions;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Objects;
+import java.util.Comparator;
+
+// DppColumns is used to store the
+class DppColumns implements Comparable<DppColumns>, Serializable {
+ public List<Object> columns = new ArrayList<Object>();;
+
+ public DppColumns(List<Object> keys){
+ this.columns = keys;
+ }
+
+ public DppColumns(DppColumns key, List<Integer> indexes){
+ for (int i = 0; i < indexes.size(); ++i) {
+ columns.add(key.columns.get(indexes.get(i)));
+ }
+ }
+
+ @Override
+ public int compareTo(DppColumns other) {
+ Preconditions.checkState(columns.size() == other.columns.size());
+
+ int cmp = 0;
+ for (int i = 0; i < columns.size(); i++) {
+ Object columnObj = columns.get(i);
+ Object otherColumn = other.columns.get(i);
+ if (columnObj == null && otherColumn == null) {
+ return 0;
+ } else if (columnObj == null || otherColumn == null) {
+ if (columnObj == null) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+ if (columns.get(i) instanceof Integer) {
+ cmp = ((Integer)(columns.get(i))).compareTo((Integer)(other.columns.get(i)));
+ } else if (columns.get(i) instanceof Long) {
+ cmp = ((Long)(columns.get(i))).compareTo((Long)(other.columns.get(i)));
+ } else if (columns.get(i) instanceof Boolean) {
+ cmp = ((Boolean)(columns.get(i))).compareTo((Boolean) (other.columns.get(i)));
+ } else if (columns.get(i) instanceof Short) {
+ cmp = ((Short)(columns.get(i))).compareTo((Short)(other.columns.get(i)));
+ } else if (columns.get(i) instanceof Float) {
+ cmp = ((Float)(columns.get(i))).compareTo((Float) (other.columns.get(i)));
+ } else if (columns.get(i) instanceof Double) {
+ cmp = ((Double)(columns.get(i))).compareTo((Double) (other.columns.get(i)));
+ } else if (columns.get(i) instanceof Date) {
+ cmp = ((Date)(columns.get(i))).compareTo((Date) (other.columns.get(i)));
+ } else if (columns.get(i) instanceof java.sql.Timestamp) {
+ cmp = ((java.sql.Timestamp)columns.get(i)).compareTo((java.sql.Timestamp)other.columns.get(i));
+ } else {
+ cmp = ((String)(columns.get(i))).compareTo((String) (other.columns.get(i)));
+ }
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+ return cmp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ DppColumns dppColumns = (DppColumns) o;
+ return Objects.equals(columns, dppColumns.columns);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columns);
+ }
+
+ @Override
+ public String toString() {
+ return "dppColumns{" +
+ "columns=" + columns +
+ '}';
+ }
+}
+
+class DppColumnsComparator implements Comparator<DppColumns> {
+ @Override
+ public int compare(DppColumns left, DppColumns right) {
+ return left.compareTo(right);
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
new file mode 100644
index 0000000..fa813ca
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppResult.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.io.Serializable;
+
+public class DppResult implements Serializable {
+ DppResult() {
+ isSuccess = true;
+ failedReason = "";
+ scannedRows = 0;
+ fileNumber = 0;
+ fileSize = 0;
+ normalRows = 0;
+ abnormalRows = 0;
+ unselectRows = 0;
+ partialAbnormalRows = "";
+ }
+
+ @SerializedName("is_success")
+ public boolean isSuccess;
+
+ @SerializedName("failed_reason")
+ public String failedReason;
+
+ @SerializedName("scanned_rows")
+ public long scannedRows;
+
+ @SerializedName("file_number")
+ public long fileNumber;
+
+ @SerializedName("file_size")
+ public long fileSize;
+
+ @SerializedName("normal_rows")
+ public long normalRows;
+
+ @SerializedName("abnormal_rows")
+ public long abnormalRows;
+
+ @SerializedName("unselect_rows")
+ public long unselectRows;
+
+ // only part of abnormal rows will be returned
+ @SerializedName("partial_abnormal_rows")
+ public String partialAbnormalRows;
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java
new file mode 100644
index 0000000..d0882ac
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/DppUtils.java
@@ -0,0 +1,257 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.Row;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.zip.CRC32;
+
+public class DppUtils {
+ public static final String BUCKET_ID = "__bucketId__";
+ public static Class getClassFromDataType(DataType dataType) {
+ if (dataType == null) {
+ return null;
+ }
+ if (dataType.equals(DataTypes.BooleanType)) {
+ return Boolean.class;
+ } else if (dataType.equals(DataTypes.ShortType)) {
+ return Short.class;
+ } else if (dataType.equals(DataTypes.IntegerType)) {
+ return Integer.class;
+ } else if (dataType.equals(DataTypes.LongType)) {
+ return Long.class;
+ } else if (dataType.equals(DataTypes.FloatType)) {
+ return Float.class;
+ } else if (dataType.equals(DataTypes.DoubleType)) {
+ return Double.class;
+ } else if (dataType.equals(DataTypes.DateType)) {
+ return Date.class;
+ } else if (dataType.equals(DataTypes.StringType)) {
+ return String.class;
+ } else if (dataType instanceof DecimalType) {
+ DecimalType decimalType = (DecimalType)dataType;
+ return BigDecimal.valueOf(decimalType.precision(), decimalType.scale()).getClass();
+ } else if (dataType.equals(DataTypes.TimestampType)) {
+ return Long.class;
+ }
+ return null;
+ }
+
+ public static Class getClassFromColumn(EtlJobConfig.EtlColumn column) throws UserException {
+ switch (column.columnType) {
+ case "BOOLEAN":
+ return Boolean.class;
+ case "TINYINT":
+ case "SMALLINT":
+ return Short.class;
+ case "INT":
+ return Integer.class;
+ case "DATETIME":
+ return java.sql.Timestamp.class;
+ case "BIGINT":
+ return Long.class;
+ case "LARGEINT":
+ throw new UserException("LARGEINT is not supported now");
+ case "FLOAT":
+ return Float.class;
+ case "DOUBLE":
+ return Double.class;
+ case "DATE":
+ return Date.class;
+ case "HLL":
+ case "CHAR":
+ case "VARCHAR":
+ case "BITMAP":
+ case "OBJECT":
+ return String.class;
+ case "DECIMALV2":
+ return BigDecimal.valueOf(column.precision, column.scale).getClass();
+ default:
+ return String.class;
+ }
+ }
+
+ public static DataType getDataTypeFromColumn(EtlJobConfig.EtlColumn column, boolean regardDistinctColumnAsBinary) {
+ DataType dataType = DataTypes.StringType;
+ switch (column.columnType) {
+ case "BOOLEAN":
+ dataType = DataTypes.StringType;
+ break;
+ case "TINYINT":
+ dataType = DataTypes.ByteType;
+ break;
+ case "SMALLINT":
+ dataType = DataTypes.ShortType;
+ break;
+ case "INT":
+ dataType = DataTypes.IntegerType;
+ break;
+ case "DATETIME":
+ dataType = DataTypes.TimestampType;
+ break;
+ case "BIGINT":
+ dataType = DataTypes.LongType;
+ break;
+ case "LARGEINT":
+ dataType = DataTypes.StringType;
+ break;
+ case "FLOAT":
+ dataType = DataTypes.FloatType;
+ break;
+ case "DOUBLE":
+ dataType = DataTypes.DoubleType;
+ break;
+ case "DATE":
+ dataType = DataTypes.DateType;
+ break;
+ case "CHAR":
+ case "VARCHAR":
+ case "OBJECT":
+ dataType = DataTypes.StringType;
+ break;
+ case "HLL":
+ case "BITMAP":
+ dataType = regardDistinctColumnAsBinary ? DataTypes.BinaryType : DataTypes.StringType;
+ break;
+ case "DECIMALV2":
+ dataType = DecimalType.apply(column.precision, column.scale);
+ break;
+ default:
+ throw new RuntimeException("Reason: invalid column type:" + column);
+ }
+ return dataType;
+ }
+
+ public static ByteBuffer getHashValue(Object o, DataType type) {
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+ if (o == null) {
+ buffer.putInt(0);
+ return buffer;
+ }
+ if (type.equals(DataTypes.ByteType)) {
+ buffer.put((byte)o);
+ } else if (type.equals(DataTypes.ShortType)) {
+ buffer.putShort((Short)o);
+ } else if (type.equals(DataTypes.IntegerType)) {
+ buffer.putInt((Integer) o);
+ } else if (type.equals(DataTypes.LongType)) {
+ buffer.putLong((Long)o);
+ } else if (type.equals(DataTypes.StringType)) {
+ try {
+ String str = String.valueOf(o);
+ buffer = ByteBuffer.wrap(str.getBytes("UTF-8"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else if (type.equals(DataTypes.BooleanType)) {
+ Boolean b = (Boolean)o;
+ String str = b ? "1" : "0";
+ try {
+ buffer = ByteBuffer.wrap(str.getBytes("UTF-8"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return buffer;
+ }
+
+ public static long getHashValue(Row row, List<String> distributeColumns, StructType dstTableSchema) {
+ CRC32 hashValue = new CRC32();
+ for (String distColumn : distributeColumns) {
+ Object columnObject = row.get(row.fieldIndex(distColumn));
+ ByteBuffer buffer = getHashValue(columnObject, dstTableSchema.apply(distColumn).dataType());
+ hashValue.update(buffer.array(), 0, buffer.limit());
+ }
+ return hashValue.getValue();
+ }
+
+ public static StructType createDstTableSchema(List<EtlJobConfig.EtlColumn> columns, boolean addBucketIdColumn, boolean regardDistinctColumnAsBinary) {
+ List<StructField> fields = new ArrayList<>();
+ if (addBucketIdColumn) {
+ StructField bucketIdField = DataTypes.createStructField(BUCKET_ID, DataTypes.StringType, true);
+ fields.add(bucketIdField);
+ }
+ for (EtlJobConfig.EtlColumn column : columns) {
+ DataType structColumnType = getDataTypeFromColumn(column, regardDistinctColumnAsBinary);
+ StructField field = DataTypes.createStructField(column.columnName, structColumnType, column.isAllowNull);
+ fields.add(field);
+ }
+ StructType dstSchema = DataTypes.createStructType(fields);
+ return dstSchema;
+ }
+
+ public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath) throws UserException {
+ if (columnsFromPath == null || columnsFromPath.isEmpty()) {
+ return Collections.emptyList();
+ }
+ String[] strings = filePath.split("/");
+ if (strings.length < 2) {
+ System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ }
+ String[] columns = new String[columnsFromPath.size()];
+ int size = 0;
+ for (int i = strings.length - 2; i >= 0; i--) {
+ String str = strings[i];
+ if (str != null && str.isEmpty()) {
+ continue;
+ }
+ if (str == null || !str.contains("=")) {
+ System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ }
+ String[] pair = str.split("=", 2);
+ if (pair.length != 2) {
+ System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ }
+ int index = columnsFromPath.indexOf(pair[0]);
+ if (index == -1) {
+ continue;
+ }
+ columns[index] = pair[1];
+ size++;
+ if (size >= columnsFromPath.size()) {
+ break;
+ }
+ }
+ if (size != columnsFromPath.size()) {
+ System.err.println("Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ throw new UserException("Reason: Fail to parse columnsFromPath, expected: " + columnsFromPath + ", filePath: " + filePath);
+ }
+ return Lists.newArrayList(columns);
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
new file mode 100644
index 0000000..80becc4
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -0,0 +1,880 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import com.google.common.base.Strings;
+import com.google.gson.Gson;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.function.ForeachPartitionFunction;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.LongAccumulator;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.TaskContext;
+
+import scala.Tuple2;
+import scala.collection.Seq;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.HashSet;
+import java.util.Arrays;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import scala.collection.JavaConverters;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+// This class is a Spark-based data preprocessing program,
+// which will make use of the distributed compute framework of spark to
+// do ETL job/sort/preaggregate jobs in spark job
+// to boost the process of large amount of data load.
+// the process steps are as following:
+// 1. load data
+// 1.1 load data from path/hive table
+// 1.2 do the etl process
+// 2. repartition data by using doris data model(partition and bucket)
+// 3. process aggregation if needed
+// 4. write data to parquet file
+public final class SparkDpp implements java.io.Serializable {
+ private static final Logger LOG = LogManager.getLogger(SparkDpp.class);
+
+ private static final String NULL_FLAG = "\\N";
+ private static final String DPP_RESULT_FILE = "dpp_result.json";
+ private static final String BITMAP_TYPE = "bitmap";
+ private SparkSession spark = null;
+ private EtlJobConfig etlJobConfig = null;
+ private LongAccumulator abnormalRowAcc = null;
+ private LongAccumulator unselectedRowAcc = null;
+ private LongAccumulator scannedRowsAcc = null;
+ private LongAccumulator fileNumberAcc = null;
+ private LongAccumulator fileSizeAcc = null;
+ private Map<String, Integer> bucketKeyMap = new HashMap<>();
+ // accumulator to collect invalid rows
+ private StringAccumulator invalidRows = new StringAccumulator();
+
+ public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
+ this.spark = spark;
+ this.etlJobConfig = etlJobConfig;
+ }
+
+ public void init() {
+ abnormalRowAcc = spark.sparkContext().longAccumulator("abnormalRowAcc");
+ unselectedRowAcc = spark.sparkContext().longAccumulator("unselectedRowAcc");
+ scannedRowsAcc = spark.sparkContext().longAccumulator("scannedRowsAcc");
+ fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc");
+ fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc");
+ spark.sparkContext().register(invalidRows, "InvalidRowsAccumulator");
+ }
+
+ private Dataset<Row> processRDDAggAndRepartition(Dataset<Row> dataframe, EtlJobConfig.EtlIndex currentIndexMeta) throws UserException {
+ final boolean isDuplicateTable = !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "AGGREGATE")
+ && !StringUtils.equalsIgnoreCase(currentIndexMeta.indexType, "UNIQUE");
+
+ // 1 make metadata for map/reduce
+ int keyLen = 0;
+ for (EtlJobConfig.EtlColumn etlColumn : currentIndexMeta.columns) {
+ keyLen = etlColumn.isKey ? keyLen + 1 : keyLen;
+ }
+
+ SparkRDDAggregator[] sparkRDDAggregators = new SparkRDDAggregator[currentIndexMeta.columns.size() - keyLen];
+
+ for (int i = 0 ; i < currentIndexMeta.columns.size(); i++) {
+ if (!currentIndexMeta.columns.get(i).isKey && !isDuplicateTable) {
+ sparkRDDAggregators[i - keyLen] = SparkRDDAggregator.buildAggregator(currentIndexMeta.columns.get(i));
+ }
+ }
+
+ PairFunction<Row, List<Object>, Object[]> encodePairFunction = isDuplicateTable ?
+ // add 1 to include bucketId
+ new EncodeDuplicateTableFunction(keyLen + 1, currentIndexMeta.columns.size() - keyLen)
+ : new EncodeAggregateTableFunction(sparkRDDAggregators, keyLen + 1);
+
+ // 2 convert dataframe to rdd and encode key and value
+ // TODO(wb) use rdd to avoid bitamp/hll serialize when calculate rollup
+ JavaPairRDD<List<Object>, Object[]> currentRollupRDD = dataframe.toJavaRDD().mapToPair(encodePairFunction);
+
+ // 3 do aggregate
+ // TODO(wb) set the reduce concurrency by statistic instead of hard code 200
+ int aggregateConcurrency = 200;
+ JavaPairRDD<List<Object>, Object[]> reduceResultRDD = isDuplicateTable ? currentRollupRDD
+ : currentRollupRDD.reduceByKey(new AggregateReduceFunction(sparkRDDAggregators), aggregateConcurrency);
+
+ // 4 repartition and finalize value column
+ JavaRDD<Row> finalRDD = reduceResultRDD
+ .repartitionAndSortWithinPartitions(new BucketPartitioner(bucketKeyMap), new BucketComparator())
+ .map(record -> {
+ List<Object> keys = record._1;
+ Object[] values = record._2;
+ int size = keys.size() + values.length;
+ Object[] result = new Object[size];
+
+ for (int i = 0; i < keys.size(); i++) {
+ result[i] = keys.get(i);
+ }
+
+ for (int i = keys.size(); i < size; i++) {
+ int valueIdx = i - keys.size();
+ result[i] = isDuplicateTable ? values[valueIdx] : sparkRDDAggregators[valueIdx].finalize(values[valueIdx]);
+ }
+
+ return RowFactory.create(result);
+ });
+
+ // 4 convert to dataframe
+ StructType tableSchemaWithBucketId = DppUtils.createDstTableSchema(currentIndexMeta.columns, true, true);
+ dataframe = spark.createDataFrame(finalRDD, tableSchemaWithBucketId);
+ return dataframe;
+
+ }
+
+ // write data to parquet file by using writing the parquet scheme of spark.
+ private void writePartitionedAndSortedDataframeToParquet(Dataset<Row> dataframe,
+ String pathPattern,
+ long tableId,
+ EtlJobConfig.EtlIndex indexMeta) throws UserException {
+ StructType outputSchema = dataframe.schema();
+ StructType dstSchema = DataTypes.createStructType(
+ Arrays.asList(outputSchema.fields()).stream()
+ .filter(field -> !field.name().equalsIgnoreCase(DppUtils.BUCKET_ID))
+ .collect(Collectors.toList()));
+ ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
+ dataframe.foreachPartition(new ForeachPartitionFunction<Row>() {
+ @Override
+ public void call(Iterator<Row> t) throws Exception {
+ // write the data to dst file
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(URI.create(etlJobConfig.outputPath), conf);
+ String lastBucketKey = null;
+ ParquetWriter<InternalRow> parquetWriter = null;
+ TaskContext taskContext = TaskContext.get();
+ long taskAttemptId = taskContext.taskAttemptId();
+ String dstPath = "";
+ String tmpPath = "";
+
+ while (t.hasNext()) {
+ Row row = t.next();
+ if (row.length() <= 1) {
+ LOG.warn("invalid row:" + row);
+ continue;
+ }
+
+
+ String curBucketKey = row.getString(0);
+ List<Object> columnObjects = new ArrayList<>();
+ for (int i = 1; i < row.length(); ++i) {
+ Object columnValue = row.get(i);
+ columnObjects.add(columnValue);
+ }
+ Row rowWithoutBucketKey = RowFactory.create(columnObjects.toArray());
+ // if the bucket key is new, it will belong to a new tablet
+ if (lastBucketKey == null || !curBucketKey.equals(lastBucketKey)) {
+ if (parquetWriter != null) {
+ parquetWriter.close();
+ // rename tmpPath to path
+ try {
+ fs.rename(new Path(tmpPath), new Path(dstPath));
+ } catch (IOException ioe) {
+ LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:" + ioe);
+ throw ioe;
+ }
+ }
+ // flush current writer and create a new writer
+ String[] bucketKey = curBucketKey.split("_");
+ if (bucketKey.length != 2) {
+ LOG.warn("invalid bucket key:" + curBucketKey);
+ continue;
+ }
+ int partitionId = Integer.parseInt(bucketKey[0]);
+ int bucketId = Integer.parseInt(bucketKey[1]);
+ dstPath = String.format(pathPattern, tableId, partitionId, indexMeta.indexId,
+ bucketId, indexMeta.schemaHash);
+ tmpPath = dstPath + "." + taskAttemptId;
+ conf.setBoolean("spark.sql.parquet.writeLegacyFormat", false);
+ conf.setBoolean("spark.sql.parquet.int64AsTimestampMillis", false);
+ conf.setBoolean("spark.sql.parquet.int96AsTimestamp", true);
+ conf.setBoolean("spark.sql.parquet.binaryAsString", false);
+ conf.set("spark.sql.parquet.outputTimestampType", "INT96");
+ ParquetWriteSupport.setSchema(dstSchema, conf);
+ ParquetWriteSupport parquetWriteSupport = new ParquetWriteSupport();
+ parquetWriter = new ParquetWriter<InternalRow>(new Path(tmpPath), parquetWriteSupport,
+ CompressionCodecName.SNAPPY, 256 * 1024 * 1024, 16 * 1024, 1024 * 1024,
+ true, false,
+ ParquetProperties.WriterVersion.PARQUET_1_0, conf);
+ if (parquetWriter != null) {
+ LOG.info("[HdfsOperate]>> initialize writer succeed! path:" + tmpPath);
+ }
+ lastBucketKey = curBucketKey;
+ }
+ InternalRow internalRow = encoder.toRow(rowWithoutBucketKey);
+ parquetWriter.write(internalRow);
+ }
+ if (parquetWriter != null) {
+ parquetWriter.close();
+ try {
+ fs.rename(new Path(tmpPath), new Path(dstPath));
+ } catch (IOException ioe) {
+ LOG.warn("rename from tmpPath" + tmpPath + " to dstPath:" + dstPath + " failed. exception:" + ioe);
+ throw ioe;
+ }
+ }
+ }
+ });
+ }
+
+ // TODO(wb) one shuffle to calculate the rollup in the same level
+ private void processRollupTree(RollupTreeNode rootNode,
+ Dataset<Row> rootDataframe,
+ long tableId, EtlJobConfig.EtlTable tableMeta,
+ EtlJobConfig.EtlIndex baseIndex) throws UserException {
+ Queue<RollupTreeNode> nodeQueue = new LinkedList<>();
+ nodeQueue.offer(rootNode);
+ int currentLevel = 0;
+ // level travel the tree
+ Map<Long, Dataset<Row>> parentDataframeMap = new HashMap<>();
+ parentDataframeMap.put(baseIndex.indexId, rootDataframe);
+ Map<Long, Dataset<Row>> childrenDataframeMap = new HashMap<>();
+ String pathPattern = etlJobConfig.outputPath + "/" + etlJobConfig.outputFilePattern;
+ while (!nodeQueue.isEmpty()) {
+ RollupTreeNode curNode = nodeQueue.poll();
+ LOG.info("start to process index:" + curNode.indexId);
+ if (curNode.children != null) {
+ for (RollupTreeNode child : curNode.children) {
+ nodeQueue.offer(child);
+ }
+ }
+ Dataset<Row> curDataFrame = null;
+ // column select for rollup
+ if (curNode.level != currentLevel) {
+ for (Dataset<Row> dataframe : parentDataframeMap.values()) {
+ dataframe.unpersist();
+ }
+ currentLevel = curNode.level;
+ parentDataframeMap.clear();
+ parentDataframeMap = childrenDataframeMap;
+ childrenDataframeMap = new HashMap<>();
+ }
+
+ long parentIndexId = baseIndex.indexId;
+ if (curNode.parent != null) {
+ parentIndexId = curNode.parent.indexId;
+ }
+
+ Dataset<Row> parentDataframe = parentDataframeMap.get(parentIndexId);
+ List<Column> columns = new ArrayList<>();
+ List<Column> keyColumns = new ArrayList<>();
+ Column bucketIdColumn = new Column(DppUtils.BUCKET_ID);
+ keyColumns.add(bucketIdColumn);
+ columns.add(bucketIdColumn);
+ for (String keyName : curNode.keyColumnNames) {
+ columns.add(new Column(keyName));
+ keyColumns.add(new Column(keyName));
+ }
+ for (String valueName : curNode.valueColumnNames) {
+ columns.add(new Column(valueName));
+ }
+ Seq<Column> columnSeq = JavaConverters.asScalaIteratorConverter(columns.iterator()).asScala().toSeq();
+ curDataFrame = parentDataframe.select(columnSeq);
+ // aggregate and repartition
+ curDataFrame = processRDDAggAndRepartition(curDataFrame, curNode.indexMeta);
+
+ childrenDataframeMap.put(curNode.indexId, curDataFrame);
+
+ if (curNode.children != null && curNode.children.size() > 1) {
+ // if the children number larger than 1, persist the dataframe for performance
+ curDataFrame.persist();
+ }
+ writePartitionedAndSortedDataframeToParquet(curDataFrame, pathPattern, tableId, curNode.indexMeta);
+ }
+ }
+
+ // repartition dataframe by partitionid_bucketid
+ // so data in the same bucket will be consecutive.
+ private Dataset<Row> repartitionDataframeByBucketId(SparkSession spark, Dataset<Row> dataframe,
+ EtlJobConfig.EtlPartitionInfo partitionInfo,
+ List<Integer> partitionKeyIndex,
+ List<Class> partitionKeySchema,
+ List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys,
+ List<String> keyColumnNames,
+ List<String> valueColumnNames,
+ StructType dstTableSchema,
+ EtlJobConfig.EtlIndex baseIndex,
+ List<Long> validPartitionIds) throws UserException {
+ List<String> distributeColumns = partitionInfo.distributionColumnRefs;
+ Partitioner partitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndex, partitionRangeKeys);
+ Set<Integer> validPartitionIndex = new HashSet<>();
+ if (validPartitionIds == null) {
+ for (int i = 0; i < partitionInfo.partitions.size(); ++i) {
+ validPartitionIndex.add(i);
+ }
+ } else {
+ for (int i = 0; i < partitionInfo.partitions.size(); ++i) {
+ if (validPartitionIds.contains(partitionInfo.partitions.get(i).partitionId)) {
+ validPartitionIndex.add(i);
+ }
+ }
+ }
+ // use PairFlatMapFunction instead of PairMapFunction because the there will be
+ // 0 or 1 output row for 1 input row
+ JavaPairRDD<String, DppColumns> pairRDD = dataframe.javaRDD().flatMapToPair(
+ new PairFlatMapFunction<Row, String, DppColumns>() {
+ @Override
+ public Iterator<Tuple2<String, DppColumns>> call(Row row) {
+ List<Object> columns = new ArrayList<>();
+ List<Object> keyColumns = new ArrayList<>();
+ for (String columnName : keyColumnNames) {
+ Object columnObject = row.get(row.fieldIndex(columnName));
+ columns.add(columnObject);
+ keyColumns.add(columnObject);
+ }
+
+ for (String columnName : valueColumnNames) {
+ columns.add(row.get(row.fieldIndex(columnName)));
+ }
+ DppColumns dppColumns = new DppColumns(columns);
+ DppColumns key = new DppColumns(keyColumns);
+ List<Tuple2<String, DppColumns>> result = new ArrayList<>();
+ int pid = partitioner.getPartition(key);
+ if (!validPartitionIndex.contains(pid)) {
+ LOG.warn("invalid partition for row:" + row + ", pid:" + pid);
+ abnormalRowAcc.add(1);
+ LOG.info("abnormalRowAcc:" + abnormalRowAcc);
+ if (abnormalRowAcc.value() < 5) {
+ LOG.info("add row to invalidRows:" + row.toString());
+ invalidRows.add(row.toString());
+ LOG.info("invalid rows contents:" + invalidRows.value());
+ }
+ } else {
+ long hashValue = DppUtils.getHashValue(row, distributeColumns, dstTableSchema);
+ int bucketId = (int) ((hashValue & 0xffffffff) % partitionInfo.partitions.get(pid).bucketNum);
+ long partitionId = partitionInfo.partitions.get(pid).partitionId;
+ // bucketKey is partitionId_bucketId
+ String bucketKey = partitionId + "_" + bucketId;
+ Tuple2<String, DppColumns> newTuple = new Tuple2<String, DppColumns>(bucketKey, dppColumns);
+ result.add(newTuple);
+ }
+ return result.iterator();
+ }
+ });
+ // TODO(wb): using rdd instead of dataframe from here
+ JavaRDD<Row> resultRdd = pairRDD.map(record -> {
+ String bucketKey = record._1;
+ List<Object> row = new ArrayList<>();
+ // bucketKey as the first key
+ row.add(bucketKey);
+ row.addAll(record._2.columns);
+ return RowFactory.create(row.toArray());
+ }
+ );
+
+ StructType tableSchemaWithBucketId = DppUtils.createDstTableSchema(baseIndex.columns, true, false);
+ dataframe = spark.createDataFrame(resultRdd, tableSchemaWithBucketId);
+ // use bucket number as the parallel number
+ int reduceNum = 0;
+ for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
+ for (int i = 0; i < partition.bucketNum; i++) {
+ bucketKeyMap.put(partition.partitionId + "_" + i, reduceNum);
+ reduceNum++;
+ }
+ }
+
+ // print to system.out for easy to find log info
+ System.out.println("print bucket key map:" + bucketKeyMap.toString());
+
+ return dataframe;
+ }
+
+ // do the etl process
+ private Dataset<Row> convertSrcDataframeToDstDataframe(EtlJobConfig.EtlIndex baseIndex,
+ Dataset<Row> srcDataframe,
+ StructType dstTableSchema,
+ EtlJobConfig.EtlFileGroup fileGroup) throws UserException {
+ Dataset<Row> dataframe = srcDataframe;
+ StructType srcSchema = dataframe.schema();
+ Set<String> srcColumnNames = new HashSet<>();
+ for (StructField field : srcSchema.fields()) {
+ srcColumnNames.add(field.name());
+ }
+ Map<String, EtlJobConfig.EtlColumnMapping> columnMappings = fileGroup.columnMappings;
+ // 1. process simple columns
+ Set<String> mappingColumns = null;
+ if (columnMappings != null) {
+ mappingColumns = columnMappings.keySet();
+ }
+ List<String> dstColumnNames = new ArrayList<>();
+ for (StructField dstField : dstTableSchema.fields()) {
+ dstColumnNames.add(dstField.name());
+ EtlJobConfig.EtlColumn column = baseIndex.getColumn(dstField.name());
+ if (!srcColumnNames.contains(dstField.name())) {
+ if (mappingColumns != null && mappingColumns.contains(dstField.name())) {
+ // mapping columns will be processed in next step
+ continue;
+ }
+ if (column.defaultValue != null) {
+ if (column.defaultValue.equals(NULL_FLAG)) {
+ dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
+ } else {
+ dataframe = dataframe.withColumn(dstField.name(), functions.lit(column.defaultValue));
+ }
+ } else if (column.isAllowNull) {
+ dataframe = dataframe.withColumn(dstField.name(), functions.lit(null));
+ } else {
+ throw new UserException("Reason: no data for column:" + dstField.name());
+ }
+ }
+ if (column.columnType.equalsIgnoreCase("DATE")) {
+ dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast("date"));
+ } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
+ dataframe = dataframe.withColumn(dstField.name(),
+ functions.when(dataframe.col(dstField.name()).equalTo("true"), "1")
+ .otherwise("0"));
+ } else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) && !dstField.dataType().equals(DataTypes.StringType)) {
+ dataframe = dataframe.withColumn(dstField.name(), dataframe.col(dstField.name()).cast(dstField.dataType()));
+ }
+ if (fileGroup.isNegative && !column.isKey) {
+ // negative load
+ // value will be convert te -1 * value
+ dataframe = dataframe.withColumn(dstField.name(), functions.expr("-1 *" + dstField.name()));
+ }
+ }
+ // 2. process the mapping columns
+ for (String mappingColumn : mappingColumns) {
+ String mappingDescription = columnMappings.get(mappingColumn).toDescription();
+ if (mappingDescription.toLowerCase().contains("hll_hash")) {
+ continue;
+ }
+ // here should cast data type to dst column type
+ dataframe = dataframe.withColumn(mappingColumn,
+ functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
+ }
+ // projection and reorder the columns
+ dataframe.createOrReplaceTempView("src_table");
+ StringBuilder selectSqlBuilder = new StringBuilder();
+ selectSqlBuilder.append("select ");
+ for (String name : dstColumnNames) {
+ selectSqlBuilder.append(name + ",");
+ }
+ selectSqlBuilder.deleteCharAt(selectSqlBuilder.length() - 1);
+ selectSqlBuilder.append(" from src_table");
+ String selectSql = selectSqlBuilder.toString();
+ dataframe = spark.sql(selectSql);
+ return dataframe;
+ }
+
+ private Dataset<Row> loadDataFromPath(SparkSession spark,
+ EtlJobConfig.EtlFileGroup fileGroup,
+ String fileUrl,
+ EtlJobConfig.EtlIndex baseIndex,
+ List<EtlJobConfig.EtlColumn> columns) throws UserException {
+ List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
+ List<String> dataSrcColumns = fileGroup.fileFieldNames;
+ if (dataSrcColumns == null) {
+ // if there is no source columns info
+ // use base index columns as source columns
+ dataSrcColumns = new ArrayList<>();
+ for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+ dataSrcColumns.add(column.columnName);
+ }
+ }
+ List<String> dstTableNames = new ArrayList<>();
+ for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+ dstTableNames.add(column.columnName);
+ }
+ List<String> srcColumnsWithColumnsFromPath = new ArrayList<>();
+ srcColumnsWithColumnsFromPath.addAll(dataSrcColumns);
+ if (fileGroup.columnsFromPath != null) {
+ srcColumnsWithColumnsFromPath.addAll(fileGroup.columnsFromPath);
+ }
+ StructType srcSchema = createScrSchema(srcColumnsWithColumnsFromPath);
+ JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
+ int columnSize = dataSrcColumns.size();
+ List<ColumnParser> parsers = new ArrayList<>();
+ for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+ parsers.add(ColumnParser.create(column));
+ }
+ // now we first support csv file
+ // TODO: support parquet file and orc file
+ JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
+ record -> {
+ scannedRowsAcc.add(1);
+ String[] attributes = record.split(fileGroup.columnSeparator);
+ List<Row> result = new ArrayList<>();
+ boolean validRow = true;
+ if (attributes.length != columnSize) {
+ LOG.warn("invalid src schema, data columns:"
+ + attributes.length + ", file group columns:"
+ + columnSize + ", row:" + record);
+ validRow = false;
+ } else {
+ for (int i = 0; i < attributes.length; ++i) {
+ if (attributes[i].equals(NULL_FLAG)) {
+ if (baseIndex.columns.get(i).isAllowNull) {
+ attributes[i] = null;
+ } else {
+ LOG.warn("colunm:" + i + " can not be null. row:" + record);
+ validRow = false;
+ break;
+ }
+ }
+ boolean isStrictMode = (boolean) etlJobConfig.properties.strictMode;
+ if (isStrictMode) {
+ StructField field = srcSchema.apply(i);
+ if (dstTableNames.contains(field.name())) {
+ String type = columns.get(i).columnType;
+ if (type.equalsIgnoreCase("CHAR")
+ || type.equalsIgnoreCase("VARCHAR")) {
+ continue;
+ }
+ ColumnParser parser = parsers.get(i);
+ boolean valid = parser.parse(attributes[i]);
+ if (!valid) {
+ validRow = false;
+ LOG.warn("invalid row:" + record
+ + ", attribute " + i + ": " + attributes[i] + " parsed failed");
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (validRow) {
+ Row row = null;
+ if (fileGroup.columnsFromPath == null) {
+ row = RowFactory.create(attributes);
+ } else {
+ // process columns from path
+ // append columns from path to the tail
+ List<String> columnAttributes = new ArrayList<>();
+ columnAttributes.addAll(Arrays.asList(attributes));
+ columnAttributes.addAll(columnValueFromPath);
+ row = RowFactory.create(columnAttributes.toArray());
+ }
+ result.add(row);
+ } else {
+ abnormalRowAcc.add(1);
+ // at most add 5 rows to invalidRows
+ if (abnormalRowAcc.value() <= 5) {
+ invalidRows.add(record);
+ }
+ }
+ return result.iterator();
+ }
+ );
+
+ Dataset<Row> dataframe = spark.createDataFrame(rowRDD, srcSchema);
+ return dataframe;
+ }
+
+ private StructType createScrSchema(List<String> srcColumns) {
+ List<StructField> fields = new ArrayList<>();
+ for (String srcColumn : srcColumns) {
+ // user StringType to load source data
+ StructField field = DataTypes.createStructField(srcColumn, DataTypes.StringType, true);
+ fields.add(field);
+ }
+ StructType srcSchema = DataTypes.createStructType(fields);
+ return srcSchema;
+ }
+
+ // partition keys will be parsed into double from json
+ // so need to convert it to partition columns' type
+ private Object convertPartitionKey(Object srcValue, Class dstClass) throws UserException {
+ if (dstClass.equals(Float.class) || dstClass.equals(Double.class)) {
+ return null;
+ }
+ if (srcValue instanceof Double) {
+ if (dstClass.equals(Short.class)) {
+ return ((Double) srcValue).shortValue();
+ } else if (dstClass.equals(Integer.class)) {
+ return ((Double) srcValue).intValue();
+ } else if (dstClass.equals(Long.class)) {
+ return ((Double) srcValue).longValue();
+ } else if (dstClass.equals(BigInteger.class)) {
+ return new BigInteger(((Double) srcValue).toString());
+ } else if (dstClass.equals(java.sql.Date.class) || dstClass.equals(java.util.Date.class)) {
+ double srcValueDouble = (double)srcValue;
+ return convertToJavaDate((int) srcValueDouble);
+ } else if (dstClass.equals(java.sql.Timestamp.class)) {
+ double srcValueDouble = (double)srcValue;
+ return convertToJavaDatetime((long)srcValueDouble);
+ } else {
+ // dst type is string
+ return srcValue.toString();
+ }
+ } else {
+ LOG.warn("unsupport partition key:" + srcValue);
+ throw new UserException("unsupport partition key:" + srcValue);
+ }
+ }
+
+ private java.sql.Timestamp convertToJavaDatetime(long src) {
+ String dateTimeStr = Long.valueOf(src).toString();
+ if (dateTimeStr.length() != 14) {
+ throw new RuntimeException("invalid input date format for SparkDpp");
+ }
+
+ String year = dateTimeStr.substring(0, 4);
+ String month = dateTimeStr.substring(4, 6);
+ String day = dateTimeStr.substring(6, 8);
+ String hour = dateTimeStr.substring(8, 10);
+ String min = dateTimeStr.substring(10, 12);
+ String sec = dateTimeStr.substring(12, 14);
+
+ return java.sql.Timestamp.valueOf(String.format("%s-%s-%s %s:%s:%s", year, month, day, hour, min, sec));
+ }
+
+ private java.sql.Date convertToJavaDate(int originDate) {
+ int day = originDate & 0x1f;
+ originDate >>= 5;
+ int month = originDate & 0x0f;
+ originDate >>= 4;
+ int year = originDate;
+ return java.sql.Date.valueOf(String.format("%04d-%02d-%02d", year, month, day));
+ }
+
+ private List<DorisRangePartitioner.PartitionRangeKey> createPartitionRangeKeys(
+ EtlJobConfig.EtlPartitionInfo partitionInfo, List<Class> partitionKeySchema) throws UserException {
+ List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
+ for (EtlJobConfig.EtlPartition partition : partitionInfo.partitions) {
+ DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey();
+ List<Object> startKeyColumns = new ArrayList<>();
+ for (int i = 0; i < partition.startKeys.size(); i++) {
+ Object value = partition.startKeys.get(i);
+ startKeyColumns.add(convertPartitionKey(value, partitionKeySchema.get(i)));
+ }
+ partitionRangeKey.startKeys = new DppColumns(startKeyColumns);
+ if (!partition.isMaxPartition) {
+ partitionRangeKey.isMaxPartition = false;
+ List<Object> endKeyColumns = new ArrayList<>();
+ for (int i = 0; i < partition.endKeys.size(); i++) {
+ Object value = partition.endKeys.get(i);
+ endKeyColumns.add(convertPartitionKey(value, partitionKeySchema.get(i)));
+ }
+ partitionRangeKey.endKeys = new DppColumns(endKeyColumns);
+ } else {
+ partitionRangeKey.isMaxPartition = true;
+ }
+ partitionRangeKeys.add(partitionRangeKey);
+ }
+ return partitionRangeKeys;
+ }
+
+ private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
+ EtlJobConfig.EtlIndex baseIndex,
+ List<String> filePaths,
+ EtlJobConfig.EtlFileGroup fileGroup,
+ StructType dstTableSchema)
+ throws UserException, IOException, URISyntaxException {
+ Dataset<Row> fileGroupDataframe = null;
+ for (String filePath : filePaths) {
+ fileNumberAcc.add(1);
+ try {
+ Configuration conf = new Configuration();
+ URI uri = new URI(filePath);
+ FileSystem fs = FileSystem.get(uri, conf);
+ FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
+ fileSizeAcc.add(fileStatus.getLen());
+ } catch (Exception e) {
+ LOG.warn("parse path failed:" + filePath);
+ throw e;
+ }
+ if (fileGroup.columnSeparator == null) {
+ LOG.warn("invalid null column separator!");
+ throw new UserException("Reason: invalid null column separator!");
+ }
+ Dataset<Row> dataframe = null;
+
+ dataframe = loadDataFromPath(spark, fileGroup, filePath, baseIndex, baseIndex.columns);
+ dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
+ if (fileGroupDataframe == null) {
+ fileGroupDataframe = dataframe;
+ } else {
+ fileGroupDataframe.union(dataframe);
+ }
+ }
+ return fileGroupDataframe;
+ }
+
+ private Dataset<Row> loadDataFromHiveTable(SparkSession spark,
+ String hiveTableName,
+ EtlJobConfig.EtlIndex baseIndex,
+ EtlJobConfig.EtlFileGroup fileGroup,
+ StructType dstTableSchema) throws UserException {
+ Dataset<Row> dataframe = spark.sql("select * from " + hiveTableName);
+ dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
+ return dataframe;
+ }
+
+ private DppResult process() throws Exception {
+ DppResult dppResult = new DppResult();
+ try {
+ for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : etlJobConfig.tables.entrySet()) {
+ Long tableId = entry.getKey();
+ EtlJobConfig.EtlTable etlTable = entry.getValue();
+
+ // get the base index meta
+ EtlJobConfig.EtlIndex baseIndex = null;
+ for (EtlJobConfig.EtlIndex indexMeta : etlTable.indexes) {
+ if (indexMeta.isBaseIndex) {
+ baseIndex = indexMeta;
+ break;
+ }
+ }
+
+ // get key column names and value column names seperately
+ List<String> keyColumnNames = new ArrayList<>();
+ List<String> valueColumnNames = new ArrayList<>();
+ for (EtlJobConfig.EtlColumn etlColumn : baseIndex.columns) {
+ if (etlColumn.isKey) {
+ keyColumnNames.add(etlColumn.columnName);
+ } else {
+ valueColumnNames.add(etlColumn.columnName);
+ }
+ }
+
+ EtlJobConfig.EtlPartitionInfo partitionInfo = etlTable.partitionInfo;
+ List<Integer> partitionKeyIndex = new ArrayList<Integer>();
+ List<Class> partitionKeySchema = new ArrayList<>();
+ for (String key : partitionInfo.partitionColumnRefs) {
+ for (int i = 0; i < baseIndex.columns.size(); ++i) {
+ EtlJobConfig.EtlColumn column = baseIndex.columns.get(i);
+ if (column.columnName.equals(key)) {
+ partitionKeyIndex.add(i);
+ partitionKeySchema.add(DppUtils.getClassFromColumn(column));
+ break;
+ }
+ }
+ }
+ List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = createPartitionRangeKeys(partitionInfo, partitionKeySchema);
+ StructType dstTableSchema = DppUtils.createDstTableSchema(baseIndex.columns, false, false);
+ RollupTreeBuilder rollupTreeParser = new MinimumCoverageRollupTreeBuilder();
+ RollupTreeNode rootNode = rollupTreeParser.build(etlTable);
+ LOG.info("Start to process rollup tree:" + rootNode);
+
+ Dataset<Row> tableDataframe = null;
+ for (EtlJobConfig.EtlFileGroup fileGroup : etlTable.fileGroups) {
+ List<String> filePaths = fileGroup.filePaths;
+ Dataset<Row> fileGroupDataframe = null;
+ if (Strings.isNullOrEmpty(fileGroup.hiveDbTableName)) {
+ fileGroupDataframe = loadDataFromFilePaths(spark, baseIndex, filePaths, fileGroup, dstTableSchema);
+ } else {
+ String taskId = etlJobConfig.outputPath.substring(etlJobConfig.outputPath.lastIndexOf("/") + 1);
+ String dorisIntermediateHiveTable = String.format(EtlJobConfig.DORIS_INTERMEDIATE_HIVE_TABLE_NAME,
+ tableId, taskId);
+ fileGroupDataframe = loadDataFromHiveTable(spark, dorisIntermediateHiveTable, baseIndex, fileGroup, dstTableSchema);
+ }
+ if (fileGroupDataframe == null) {
+ LOG.info("no data for file file group:" + fileGroup);
+ continue;
+ }
+ if (!Strings.isNullOrEmpty(fileGroup.where)) {
+ long originalSize = fileGroupDataframe.count();
+ fileGroupDataframe = fileGroupDataframe.filter(fileGroup.where);
+ long currentSize = fileGroupDataframe.count();
+ unselectedRowAcc.add(currentSize - originalSize);
+ }
+
+ fileGroupDataframe = repartitionDataframeByBucketId(spark, fileGroupDataframe,
+ partitionInfo, partitionKeyIndex,
+ partitionKeySchema, partitionRangeKeys,
+ keyColumnNames, valueColumnNames,
+ dstTableSchema, baseIndex, fileGroup.partitions);
+ if (tableDataframe == null) {
+ tableDataframe = fileGroupDataframe;
+ } else {
+ tableDataframe.union(fileGroupDataframe);
+ }
+ }
+ processRollupTree(rootNode, tableDataframe, tableId, etlTable, baseIndex);
+ }
+ spark.stop();
+ } catch (Exception exception) {
+ LOG.warn("spark dpp failed for exception:" + exception);
+ dppResult.isSuccess = false;
+ dppResult.failedReason = exception.getMessage();
+ dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
+ dppResult.scannedRows = scannedRowsAcc.value();
+ dppResult.fileNumber = fileNumberAcc.value();
+ dppResult.fileSize = fileSizeAcc.value();
+ dppResult.abnormalRows = abnormalRowAcc.value();
+ dppResult.partialAbnormalRows = invalidRows.value();
+ throw exception;
+ }
+ LOG.info("invalid rows contents:" + invalidRows.value());
+ dppResult.isSuccess = true;
+ dppResult.failedReason = "";
+ dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
+ dppResult.scannedRows = scannedRowsAcc.value();
+ dppResult.fileNumber = fileNumberAcc.value();
+ dppResult.fileSize = fileSizeAcc.value();
+ dppResult.abnormalRows = abnormalRowAcc.value();
+ dppResult.partialAbnormalRows = invalidRows.value();
+ return dppResult;
+ }
+
+ public void doDpp() throws Exception {
+ // write dpp result to output
+ DppResult dppResult = process();
+ String outputPath = etlJobConfig.getOutputPath();
+ String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
+ Configuration conf = new Configuration();
+ URI uri = new URI(outputPath);
+ FileSystem fs = FileSystem.get(uri, conf);
+ Path filePath = new Path(resultFilePath);
+ FSDataOutputStream outputStream = fs.create(filePath);
+ Gson gson = new Gson();
+ outputStream.write(gson.toJson(dppResult).getBytes());
+ outputStream.write('\n');
+ outputStream.close();
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
new file mode 100644
index 0000000..adfca9f
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
@@ -0,0 +1,564 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.doris.common.UserException;
+import org.apache.doris.load.loadv2.BitmapValue;
+import org.apache.doris.load.loadv2.Hll;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.sql.Row;
+import scala.Tuple2;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+// contains all class about spark aggregate
+
+public abstract class SparkRDDAggregator<T> implements Serializable {
+
+ T init(Object value) {
+ return (T) value;
+ }
+
+ abstract T update(T v1, T v2);
+
+ Object finalize(Object value) {
+ return value;
+ };
+
+ // TODO(wb) support more datatype:decimal,date,datetime
+ public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws UserException {
+ String aggType = StringUtils.lowerCase(column.aggregationType);
+ String columnType = StringUtils.lowerCase(column.columnType);
+ switch (aggType) {
+ case "bitmap_union" :
+ return new BitmapUnionAggregator();
+ case "hll_union" :
+ return new HllUnionAggregator();
+ case "max":
+ switch (columnType) {
+ case "tinyint":
+ case "smallint":
+ case "int":
+ case "bigint":
+ case "float":
+ case "double":
+ return new NumberMaxAggregator();
+ case "char":
+ case "varchar":
+ return new StringMaxAggregator();
+ case "largeint":
+ return new LargeIntMaxAggregator();
+ default:
+ throw new UserException(String.format("unsupported max aggregator for column type:%s", columnType));
+ }
+ case "min":
+ switch (columnType) {
+ case "tinyint":
+ case "smallint":
+ case "int":
+ case "bigint":
+ case "float":
+ case "double":
+ return new NumberMinAggregator();
+ case "char":
+ case "varchar":
+ return new StringMinAggregator();
+ case "largeint":
+ return new LargeIntMinAggregator();
+ default:
+ throw new UserException(String.format("unsupported min aggregator for column type:%s", columnType));
+ }
+ case "sum":
+ switch (columnType) {
+ case "tinyint":
+ return new ByteSumAggregator();
+ case "smallint":
+ return new ShortSumAggregator();
+ case "int":
+ return new IntSumAggregator();
+ case "bigint":
+ return new LongSumAggregator();
+ case "float":
+ return new FloatSumAggregator();
+ case "double":
+ return new DoubleSumAggregator();
+ case "largeint":
+ return new LargeIntSumAggregator();
+ default:
+ throw new UserException(String.format("unsupported sum aggregator for column type:%s", columnType));
+ }
+ case "replace_if_not_null":
+ return new ReplaceIfNotNullAggregator();
+ case "replace":
+ return new ReplaceAggregator();
+ default:
+ throw new UserException(String.format("unsupported aggregate type %s", aggType));
+ }
+ }
+
+}
+
+class EncodeDuplicateTableFunction extends EncodeAggregateTableFunction {
+
+ private int valueLen;
+
+ public EncodeDuplicateTableFunction(int keyLen, int valueLen) {
+ super(keyLen);
+ this.valueLen = valueLen;
+ }
+
+ @Override
+ public Tuple2<List<Object>, Object[]> call(Row row) throws Exception {
+ List<Object> keys = new ArrayList(keyLen);
+ Object[] values = new Object[valueLen];
+
+ for (int i = 0; i < keyLen; i++) {
+ keys.add(row.get(i));
+ }
+
+ for (int i = keyLen; i < row.length(); i++) {
+ values[i - keyLen] = row.get(i);
+ }
+
+ return new Tuple2<>(keys, values);
+ }
+}
+
+class EncodeAggregateTableFunction implements PairFunction<Row, List<Object>, Object[]> {
+
+ private SparkRDDAggregator[] valueAggregators;
+ // include bucket id
+ protected int keyLen;
+
+ public EncodeAggregateTableFunction(int keyLen) {
+ this.keyLen = keyLen;
+ }
+
+ public EncodeAggregateTableFunction(SparkRDDAggregator[] valueAggregators, int keyLen) {
+ this.valueAggregators = valueAggregators;
+ this.keyLen = keyLen;
+ }
+
+ // TODO(wb): use a custom class as key to instead of List to save space
+ @Override
+ public Tuple2<List<Object>, Object[]> call(Row row) throws Exception {
+ List<Object> keys = new ArrayList(keyLen);
+ Object[] values = new Object[valueAggregators.length];
+
+ for (int i = 0; i < keyLen; i++) {
+ keys.add(row.get(i));
+ }
+
+ for (int i = keyLen; i < row.size(); i++) {
+ int valueIdx = i - keyLen;
+ values[valueIdx] = valueAggregators[valueIdx].init(row.get(i));
+ }
+ return new Tuple2<>(keys, values);
+ }
+}
+
+class AggregateReduceFunction implements Function2<Object[], Object[], Object[]> {
+
+ private SparkRDDAggregator[] valueAggregators;
+
+ public AggregateReduceFunction(SparkRDDAggregator[] sparkDppAggregators) {
+ this.valueAggregators = sparkDppAggregators;
+ }
+
+ @Override
+ public Object[] call(Object[] v1, Object[] v2) throws Exception {
+ Object[] result = new Object[valueAggregators.length];
+ for (int i = 0; i < v1.length; i++) {
+ result[i] = valueAggregators[i].update(v1[i], v2[i]);
+ }
+ return result;
+ }
+}
+
+class ReplaceAggregator extends SparkRDDAggregator<Object> {
+
+ @Override
+ Object update(Object dst, Object src) {
+ return src;
+ }
+}
+
+class ReplaceIfNotNullAggregator extends SparkRDDAggregator<Object> {
+
+ @Override
+ Object update(Object dst, Object src) {
+ return src == null ? dst : src;
+ }
+}
+
+class BitmapUnionAggregator extends SparkRDDAggregator<BitmapValue> {
+
+ @Override
+ BitmapValue init(Object value) {
+ try {
+ BitmapValue bitmapValue = new BitmapValue();
+ if (value instanceof byte[]) {
+ bitmapValue.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) value)));
+ } else {
+ bitmapValue.add(value == null ? 0l : Long.valueOf(value.toString()));
+ }
+ return bitmapValue;
+ } catch (Exception e) {
+ throw new RuntimeException("build bitmap value failed", e);
+ }
+ }
+
+ @Override
+ BitmapValue update(BitmapValue v1, BitmapValue v2) {
+ BitmapValue newBitmapValue = new BitmapValue();
+ if (v1 != null) {
+ newBitmapValue.or(v1);
+ }
+ if (v2 != null) {
+ newBitmapValue.or(v2);
+ }
+ return newBitmapValue;
+ }
+
+ @Override
+ byte[] finalize(Object value) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ ((BitmapValue)value).serialize(outputStream);
+ return bos.toByteArray();
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ throw new RuntimeException(ioException);
+ }
+ }
+
+}
+
+class HllUnionAggregator extends SparkRDDAggregator<Hll> {
+
+ @Override
+ Hll init(Object value) {
+ try {
+ Hll hll = new Hll();
+ if (value instanceof byte[]) {
+ hll.deserialize(new DataInputStream(new ByteArrayInputStream((byte[]) value)));
+ } else {
+ hll.updateWithHash(value == null ? 0 : value);
+ }
+ return hll;
+ } catch (Exception e) {
+ throw new RuntimeException("build hll value failed", e);
+ }
+ }
+
+ @Override
+ Hll update(Hll v1, Hll v2) {
+ Hll newHll = new Hll();
+ if (v1 != null) {
+ newHll.merge(v1);
+ }
+ if (v2 != null) {
+ newHll.merge(v2);
+ }
+ return newHll;
+ }
+
+ @Override
+ byte[] finalize(Object value) {
+ try {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ ((Hll)value).serialize(outputStream);
+ return bos.toByteArray();
+ } catch (IOException ioException) {
+ ioException.printStackTrace();
+ throw new RuntimeException(ioException);
+ }
+ }
+
+}
+
+class LargeIntMaxAggregator extends SparkRDDAggregator<BigInteger> {
+
+ BigInteger init(Object value) {
+ if (value == null) {
+ return null;
+ }
+ return new BigInteger(value.toString());
+ }
+
+ @Override
+ BigInteger update(BigInteger dst, BigInteger src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst.compareTo(src) > 0 ? dst : src;
+ }
+}
+
+class LargeIntMinAggregator extends LargeIntMaxAggregator {
+
+ @Override
+ BigInteger update(BigInteger dst, BigInteger src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst.compareTo(src) < 0 ? dst : src;
+ }
+}
+
+class LargeIntSumAggregator extends LargeIntMaxAggregator {
+
+ @Override
+ BigInteger update(BigInteger dst, BigInteger src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst.add(src);
+ }
+}
+
+
+class NumberMaxAggregator extends SparkRDDAggregator {
+
+ @Override
+ Object update(Object dst, Object src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return ((Comparable)dst).compareTo(src) > 0 ? dst : src;
+ }
+}
+
+
+class NumberMinAggregator extends SparkRDDAggregator {
+
+ @Override
+ Object update(Object dst, Object src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return ((Comparable)dst).compareTo(src) < 0 ? dst : src;
+ }
+}
+
+class LongSumAggregator extends SparkRDDAggregator<Long> {
+
+ @Override
+ Long update(Long dst, Long src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ // TODO(wb) check overflow of long type
+ return dst + src;
+ }
+}
+
+class ShortSumAggregator extends SparkRDDAggregator<Short> {
+
+ @Override
+ Short update(Short dst, Short src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ Integer ret = dst + src;
+ if (ret > Short.MAX_VALUE || ret < Short.MIN_VALUE) {
+ throw new RuntimeException("short column sum size exceeds Short.MAX_VALUE or Short.MIN_VALUE");
+ }
+ return Short.valueOf(ret.toString());
+ }
+}
+
+class IntSumAggregator extends SparkRDDAggregator<Integer> {
+
+ @Override
+ Integer update(Integer dst, Integer src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ long ret = Long.sum(dst, src);
+ if (ret > Integer.MAX_VALUE || ret < Integer.MIN_VALUE) {
+ throw new RuntimeException("int column sum size exceeds Integer.MAX_VALUE or Integer.MIN_VALUE");
+ }
+ return (int) ret;
+ }
+}
+
+class ByteSumAggregator extends SparkRDDAggregator<Byte> {
+
+ @Override
+ Byte update(Byte dst, Byte src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ Integer ret = dst + src;
+ if (ret > Byte.MAX_VALUE || ret < Byte.MIN_VALUE) {
+ throw new RuntimeException("byte column sum size exceeds Byte.MAX_VALUE or Byte.MIN_VALUE");
+ }
+ return Byte.valueOf(ret.toString());
+ }
+}
+
+class DoubleSumAggregator extends SparkRDDAggregator<Double> {
+
+ @Override
+ strictfp Double update(Double dst, Double src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst + src;
+ }
+}
+
+// TODO(wb) add bound check for float/double
+class FloatSumAggregator extends SparkRDDAggregator<Float> {
+
+ @Override
+ strictfp Float update(Float dst, Float src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst + src;
+ }
+}
+
+class StringMaxAggregator extends SparkRDDAggregator<String> {
+
+ @Override
+ String update(String dst, String src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst.compareTo(src) > 0 ? dst : src;
+ }
+}
+
+class StringMinAggregator extends SparkRDDAggregator<String> {
+
+ @Override
+ String update(String dst, String src) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return dst.compareTo(src) < 0 ? dst : src;
+ }
+}
+
+
+class BucketComparator implements Comparator<List<Object>>, Serializable {
+
+ @Override
+ public int compare(List<Object> keyArray1, List<Object> keyArray2) {
+ int cmp = 0;
+
+ for (int i = 0; i < keyArray1.size(); i++) {
+ Object key1 = keyArray1.get(i);
+ Object key2 = keyArray2.get(i);
+ if (key1 == key2) {
+ continue;
+ }
+ if (key1 == null || key2 == null) {
+ return key1 == null ? -1 : 1;
+ }
+ if (key1 instanceof Comparable && key2 instanceof Comparable) {
+ cmp = ((Comparable) key1).compareTo(key2);
+ } else {
+ throw new RuntimeException(String.format("uncomparable column type %s", key1.getClass().toString()));
+ }
+ if (cmp != 0) {
+ return cmp;
+ }
+ }
+
+ return cmp;
+ }
+}
+
+class BucketPartitioner extends Partitioner {
+
+ private Map<String, Integer> bucketKeyMap;
+
+ public BucketPartitioner(Map<String, Integer> bucketKeyMap) {
+ this.bucketKeyMap = bucketKeyMap;
+ }
+
+ @Override
+ public int numPartitions() {
+ return bucketKeyMap.size();
+ }
+
+ @Override
+ public int getPartition(Object key) {
+ List<Object> rddKey = (List<Object>) key;
+ return bucketKeyMap.get(String.valueOf(rddKey.get(0)));
+ }
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java
new file mode 100644
index 0000000..02099de
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/dpp/StringAccumulator.java
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.spark.util.AccumulatorV2;
+import java.util.List;
+import java.util.ArrayList;
+
+// This class is a accumulator of string based on AccumulatorV2
+// (https://spark.apache.org/docs/latest/api/java/org/apache/spark/util/AccumulatorV2.html).
+// Spark does not provide string accumulator.
+//
+// This class is used to collect the invalid rows when doing etl.
+public class StringAccumulator extends AccumulatorV2<String, String> {
+ private List<String> strs = new ArrayList<>();
+
+ @Override
+ public boolean isZero() {
+ return strs.isEmpty();
+ }
+
+ @Override
+ public AccumulatorV2<String, String> copy() {
+ StringAccumulator newAccumulator = new StringAccumulator();
+ newAccumulator.strs.addAll(this.strs);
+ return newAccumulator;
+ }
+
+ @Override
+ public void reset() {
+ strs.clear();
+ }
+
+ @Override
+ public void add(String v) {
+ strs.add(v);
+ }
+
+ @Override
+ public void merge(AccumulatorV2<String, String> other) {
+ StringAccumulator o = (StringAccumulator)other;
+ strs.addAll(o.strs);
+ }
+
+ @Override
+ public String value() {
+ return strs.toString();
+ }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java
new file mode 100644
index 0000000..37addb7
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DorisRangePartitionerTest.java
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.dpp.DorisRangePartitioner;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DorisRangePartitionerTest {
+
+ @Test
+ public void testRangePartitioner() {
+ List<Object> startKeys = new ArrayList<>();
+ startKeys.add(new Integer(0));
+ List<Object> endKeys = new ArrayList<>();
+ endKeys.add(new Integer(100));
+ EtlJobConfig.EtlPartition partition1 = new EtlJobConfig.EtlPartition(
+ 10000, startKeys, endKeys, false, 3);
+
+ List<Object> startKeys2 = new ArrayList<>();
+ startKeys2.add(new Integer(100));
+ List<Object> endKeys2 = new ArrayList<>();
+ endKeys2.add(new Integer(200));
+ EtlJobConfig.EtlPartition partition2 = new EtlJobConfig.EtlPartition(
+ 10001, startKeys2, endKeys2, false, 4);
+
+ List<Object> startKeys3 = new ArrayList<>();
+ startKeys3.add(new Integer(200));
+ List<Object> endKeys3 = new ArrayList<>();
+ endKeys3.add(new Integer(300));
+ EtlJobConfig.EtlPartition partition3 = new EtlJobConfig.EtlPartition(
+ 10002, startKeys3, endKeys3, false, 5);
+
+ List<EtlJobConfig.EtlPartition> partitions = new ArrayList<>();
+ partitions.add(partition1);
+ partitions.add(partition2);
+ partitions.add(partition3);
+
+ List<String> partitionColumns = new ArrayList<>();
+ partitionColumns.add("id");
+ List<String> bucketColumns = new ArrayList<>();
+ bucketColumns.add("key");
+ EtlJobConfig.EtlPartitionInfo partitionInfo = new EtlJobConfig.EtlPartitionInfo(
+ "RANGE", partitionColumns, bucketColumns, partitions);
+ List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
+ for (EtlJobConfig.EtlPartition partition : partitions) {
+ DorisRangePartitioner.PartitionRangeKey partitionRangeKey = new DorisRangePartitioner.PartitionRangeKey();
+ partitionRangeKey.isMaxPartition = false;
+ partitionRangeKey.startKeys = new DppColumns(partition.startKeys);
+ partitionRangeKey.endKeys = new DppColumns(partition.endKeys);
+ partitionRangeKeys.add(partitionRangeKey);
+ }
+ List<Integer> partitionKeyIndexes = new ArrayList<>();
+ partitionKeyIndexes.add(0);
+ DorisRangePartitioner rangePartitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndexes, partitionRangeKeys);
+ int num = rangePartitioner.numPartitions();
+ Assert.assertEquals(3, num);
+
+ List<Object> fields1 = new ArrayList<>();
+ fields1.add(-100);
+ fields1.add("name");
+ DppColumns record1 = new DppColumns(fields1);
+ int id1 = rangePartitioner.getPartition(record1);
+ Assert.assertEquals(-1, id1);
+
+ List<Object> fields2 = new ArrayList<>();
+ fields2.add(10);
+ fields2.add("name");
+ DppColumns record2 = new DppColumns(fields2);
+ int id2 = rangePartitioner.getPartition(record2);
+ Assert.assertEquals(0, id2);
+
+ List<Object> fields3 = new ArrayList<>();
+ fields3.add(110);
+ fields3.add("name");
+ DppColumns record3 = new DppColumns(fields3);
+ int id3 = rangePartitioner.getPartition(record3);
+ Assert.assertEquals(1, id3);
+
+ List<Object> fields4 = new ArrayList<>();
+ fields4.add(210);
+ fields4.add("name");
+ DppColumns record4 = new DppColumns(fields4);
+ int id4 = rangePartitioner.getPartition(record4);
+ Assert.assertEquals(2, id4);
+
+ List<Object> fields5 = new ArrayList<>();
+ fields5.add(310);
+ fields5.add("name");
+ DppColumns record5 = new DppColumns(fields5);
+ int id5 = rangePartitioner.getPartition(record5);
+ Assert.assertEquals(-1, id5);
+ }
+
+ @Test
+ public void testUnpartitionedPartitioner() {
+ List<String> partitionColumns = new ArrayList<>();
+ List<String> bucketColumns = new ArrayList<>();
+ bucketColumns.add("key");
+ EtlJobConfig.EtlPartitionInfo partitionInfo = new EtlJobConfig.EtlPartitionInfo(
+ "UNPARTITIONED", null, bucketColumns, null);
+ List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys = new ArrayList<>();
+ List<Class> partitionSchema = new ArrayList<>();
+ partitionSchema.add(Integer.class);
+ List<Integer> partitionKeyIndexes = new ArrayList<>();
+ partitionKeyIndexes.add(0);
+ DorisRangePartitioner rangePartitioner = new DorisRangePartitioner(partitionInfo, partitionKeyIndexes, null);
+ int num = rangePartitioner.numPartitions();
+ Assert.assertEquals(1, num);
+
+ List<Object> fields = new ArrayList<>();
+ fields.add(100);
+ fields.add("name");
+ DppColumns record = new DppColumns(fields);
+ int id = rangePartitioner.getPartition(record);
+ Assert.assertEquals(0, id);
+ }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java
new file mode 100644
index 0000000..4db033f
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/DppUtilsTest.java
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package org.apache.doris.load.loadv2.dpp;
+
+import io.netty.handler.codec.dns.DefaultDnsPtrRecord;
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+import java.lang.Exception;
+
+public class DppUtilsTest {
+
+ @Test
+ public void testGetClassFromDataType() {
+ DppUtils dppUtils = new DppUtils();
+
+ Class stringResult = dppUtils.getClassFromDataType(DataTypes.StringType);
+ Assert.assertEquals(String.class, stringResult);
+
+ Class booleanResult = dppUtils.getClassFromDataType(DataTypes.BooleanType);
+ Assert.assertEquals(Boolean.class, booleanResult);
+
+ Class shortResult = dppUtils.getClassFromDataType(DataTypes.ShortType);
+ Assert.assertEquals(Short.class, shortResult);
+
+ Class integerResult = dppUtils.getClassFromDataType(DataTypes.IntegerType);
+ Assert.assertEquals(Integer.class, integerResult);
+
+ Class longResult = dppUtils.getClassFromDataType(DataTypes.LongType);
+ Assert.assertEquals(Long.class, longResult);
+
+ Class floatResult = dppUtils.getClassFromDataType(DataTypes.FloatType);
+ Assert.assertEquals(Float.class, floatResult);
+
+ Class doubleResult = dppUtils.getClassFromDataType(DataTypes.DoubleType);
+ Assert.assertEquals(Double.class, doubleResult);
+
+ Class dateResult = dppUtils.getClassFromDataType(DataTypes.DateType);
+ Assert.assertEquals(Date.class, dateResult);
+ }
+
+ @Test
+ public void testGetClassFromColumn() {
+ DppUtils dppUtils = new DppUtils();
+
+ try {
+ EtlJobConfig.EtlColumn column = new EtlJobConfig.EtlColumn();
+ column.columnType = "CHAR";
+ Class charResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(String.class, charResult);
+
+ column.columnType = "HLL";
+ Class hllResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(String.class, hllResult);
+
+ column.columnType = "OBJECT";
+ Class objectResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(String.class, objectResult);
+
+ column.columnType = "BOOLEAN";
+ Class booleanResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Boolean.class, booleanResult);
+
+ column.columnType = "TINYINT";
+ Class tinyResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Short.class, tinyResult);
+
+ column.columnType = "SMALLINT";
+ Class smallResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Short.class, smallResult);
+
+ column.columnType = "INT";
+ Class integerResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Integer.class, integerResult);
+
+ column.columnType = "DATETIME";
+ Class datetimeResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(java.sql.Timestamp.class, datetimeResult);
+
+ column.columnType = "FLOAT";
+ Class floatResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Float.class, floatResult);
+
+ column.columnType = "DOUBLE";
+ Class doubleResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Double.class, doubleResult);
+
+ column.columnType = "DATE";
+ Class dateResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(Date.class, dateResult);
+
+ column.columnType = "DECIMALV2";
+ column.precision = 10;
+ column.scale = 2;
+ Class decimalResult = dppUtils.getClassFromColumn(column);
+ Assert.assertEquals(BigDecimal.valueOf(10, 2).getClass(), decimalResult);
+ } catch (Exception e) {
+ Assert.assertFalse(false);
+ }
+
+ }
+
+ @Test
+ public void testGetDataTypeFromColumn() {
+ DppUtils dppUtils = new DppUtils();
+
+ try {
+ EtlJobConfig.EtlColumn column = new EtlJobConfig.EtlColumn();
+ column.columnType = "VARCHAR";
+ DataType stringResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.StringType, stringResult);
+
+ column.columnType = "CHAR";
+ DataType charResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.StringType, charResult);
+
+ column.columnType = "HLL";
+ DataType hllResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.StringType, hllResult);
+
+ column.columnType = "OBJECT";
+ DataType objectResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.StringType, objectResult);
+
+ column.columnType = "BOOLEAN";
+ DataType booleanResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.StringType, booleanResult);
+
+ column.columnType = "TINYINT";
+ DataType tinyResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.ByteType, tinyResult);
+
+ column.columnType = "SMALLINT";
+ DataType smallResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.ShortType, smallResult);
+
+ column.columnType = "INT";
+ DataType integerResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.IntegerType, integerResult);
+
+ column.columnType = "BIGINT";
+ DataType longResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.LongType, longResult);
+
+ column.columnType = "DATETIME";
+ DataType datetimeResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.TimestampType, datetimeResult);
+
+ column.columnType = "FLOAT";
+ DataType floatResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.FloatType, floatResult);
+
+ column.columnType = "DOUBLE";
+ DataType doubleResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.DoubleType, doubleResult);
+
+ column.columnType = "DATE";
+ DataType dateResult = dppUtils.getDataTypeFromColumn(column, false);
+ Assert.assertEquals(DataTypes.DateType, dateResult);
+ } catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ @Test
+ public void testCreateDstTableSchema() {
+ DppUtils dppUtils = new DppUtils();
+
+ EtlJobConfig.EtlColumn column1 = new EtlJobConfig.EtlColumn(
+ "column1", "INT",
+ true, true,
+ "NONE", "0",
+ 0, 0, 0);
+ EtlJobConfig.EtlColumn column2 = new EtlJobConfig.EtlColumn(
+ "column2", "SMALLINT",
+ true, true,
+ "NONE", "0",
+ 0, 0, 0);
+ List<EtlJobConfig.EtlColumn> columns = new ArrayList<>();
+ columns.add(column1);
+ columns.add(column2);
+
+ try {
+ StructType schema = dppUtils.createDstTableSchema(columns, false, false);
+ Assert.assertEquals(2, schema.fieldNames().length);
+ Assert.assertEquals("column1", schema.fieldNames()[0]);
+ Assert.assertEquals("column2", schema.fieldNames()[1]);
+
+ StructType schema2 = dppUtils.createDstTableSchema(columns, true, false);
+ Assert.assertEquals(3, schema2.fieldNames().length);
+ Assert.assertEquals("__bucketId__", schema2.fieldNames()[0]);
+ Assert.assertEquals("column1", schema2.fieldNames()[1]);
+ Assert.assertEquals("column2", schema2.fieldNames()[2]);
+ } catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+ }
+
+ @Test
+ public void testParseColumnsFromPath() {
+ DppUtils dppUtils = new DppUtils();
+
+ String path = "/path/to/file/city=beijing/date=2020-04-10/data";
+ List<String> columnFromPaths = new ArrayList<>();
+ columnFromPaths.add("city");
+ columnFromPaths.add("date");
+ try {
+ List<String> columnFromPathValues = dppUtils.parseColumnsFromPath(path, columnFromPaths);
+ Assert.assertEquals(2, columnFromPathValues.size());
+ Assert.assertEquals("beijing", columnFromPathValues.get(0));
+ Assert.assertEquals("2020-04-10", columnFromPathValues.get(1));
+ } catch (Exception e) {
+ Assert.assertTrue(false);
+ }
+ }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java
new file mode 100644
index 0000000..6e0e93e
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/dpp/MinimumCoverageRollupTreeBuilderTest.java
@@ -0,0 +1,111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.doris.load.loadv2.dpp.MinimumCoverageRollupTreeBuilder;
+import org.apache.doris.load.loadv2.dpp.RollupTreeNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MinimumCoverageRollupTreeBuilderTest {
+
+ @Test
+ public void testBuild() {
+ EtlJobConfig.EtlColumn column1 = new EtlJobConfig.EtlColumn(
+ "column1", "INT",
+ true, true,
+ "NONE", "0",
+ 0, 0, 0);
+ EtlJobConfig.EtlColumn column2 = new EtlJobConfig.EtlColumn(
+ "column2", "SMALLINT",
+ true, true,
+ "NONE", "0",
+ 0, 0, 0);
+ EtlJobConfig.EtlColumn column3 = new EtlJobConfig.EtlColumn(
+ "column3", "VARCHAR",
+ true, true,
+ "NONE", "",
+ 0, 0, 0);
+ EtlJobConfig.EtlColumn column4 = new EtlJobConfig.EtlColumn(
+ "column4", "INT",
+ true, false,
+ "SUM", "",
+ 0, 0, 0);
+ List<EtlJobConfig.EtlColumn> baseColumns = new ArrayList<>();
+ baseColumns.add(column1);
+ baseColumns.add(column2);
+ baseColumns.add(column3);
+ baseColumns.add(column4);
+ EtlJobConfig.EtlIndex baseIndex = new EtlJobConfig.EtlIndex(10000,
+ baseColumns, 12345, "DUPLICATE", true);
+ List<EtlJobConfig.EtlColumn> roll1Columns = new ArrayList<>();
+ roll1Columns.add(column1);
+ roll1Columns.add(column2);
+ roll1Columns.add(column4);
+ EtlJobConfig.EtlIndex roll1Index = new EtlJobConfig.EtlIndex(10001,
+ roll1Columns, 12346, "AGGREGATE", false);
+ List<EtlJobConfig.EtlColumn> roll2Columns = new ArrayList<>();
+ roll2Columns.add(column1);
+ roll2Columns.add(column4);
+ EtlJobConfig.EtlIndex roll2Index = new EtlJobConfig.EtlIndex(10002,
+ roll2Columns, 12347, "AGGREGATE", false);
+
+ List<EtlJobConfig.EtlColumn> roll3Columns = new ArrayList<>();
+ roll3Columns.add(column3);
+ roll3Columns.add(column4);
+ EtlJobConfig.EtlIndex roll3Index = new EtlJobConfig.EtlIndex(10003,
+ roll3Columns, 12348, "AGGREGATE", false);
+
+ List<EtlJobConfig.EtlIndex> indexes = new ArrayList<>();
+ indexes.add(baseIndex);
+ indexes.add(roll1Index);
+ indexes.add(roll2Index);
+ indexes.add(roll3Index);
+ EtlJobConfig.EtlTable table = new EtlJobConfig.EtlTable(indexes, null);
+
+ MinimumCoverageRollupTreeBuilder builder = new MinimumCoverageRollupTreeBuilder();
+ RollupTreeNode resultNode = builder.build(table);
+ Assert.assertEquals(resultNode.parent, null);
+ Assert.assertEquals(resultNode.indexId, 10000);
+ Assert.assertEquals(resultNode.level, 0);
+ Assert.assertEquals(resultNode.children.size(), 2);
+
+ RollupTreeNode index1Node = resultNode.children.get(0);
+ Assert.assertEquals(index1Node.parent.indexId, 10000);
+ Assert.assertEquals(index1Node.indexId, 10001);
+ Assert.assertEquals(index1Node.level, 1);
+ Assert.assertEquals(index1Node.children.size(), 1);
+
+ RollupTreeNode index3Node = resultNode.children.get(1);
+ Assert.assertEquals(index3Node.parent.indexId, 10000);
+ Assert.assertEquals(index3Node.indexId, 10003);
+ Assert.assertEquals(index3Node.level, 1);
+ Assert.assertEquals(index3Node.children, null);
+
+ RollupTreeNode index2Node = index1Node.children.get(0);
+ Assert.assertEquals(index2Node.parent.indexId, 10001);
+ Assert.assertEquals(index2Node.indexId, 10002);
+ Assert.assertEquals(index2Node.level, 2);
+ Assert.assertEquals(index2Node.children, null);
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org