You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/01/28 09:45:22 UTC

[GitHub] [incubator-doris] morningman opened a new pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

morningman opened a new pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824
 
 
   First, In the broker load, we allow users to add multiple data descriptions. Each data description
    represents a description of a file (or set of files). Including file path, delimiter, table and 
   partitions to be loaded, and other information.
   
   When the user specifies multiple data descriptions, Doris currently aggregates the data 
   descriptions belonging to the same table and generates a unified load task.
   
   The problem here is that although different data descriptions point to the same table, 
   they may specify different partitions. Therefore, the aggregation of data description
    should not only consider the table level, but also the partition level.
   
   Examples are as follows:
   
   data description 1 is: 
   ```
   DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
   INTO TABLE `tbl1`
   PARTITION (p1, p2)
   ```
   
   data description 2 is:
   ```
   DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
   INTO TABLE `tbl1`
   PARTITION (p3, p4)
   ```
   What user expects is to load file1 into partition p1 and p2 of tbl1, and load file2 into paritition
   p3 and p4 of same table. But currently, it will be aggregated together, which result in loading
   file1 and file2 into all partitions p1, p2, p3 and p4.
   
   Second, there is a problem with the code implementation. In the constructor of 
   `OlapTableSink.java`, we pass in a string of partition names separated by commas. 
   But at the `OlapTableSink` level, we should be able to pass in a list of partition ids directly,
    instead of names.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373987967
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
 
 Review comment:
   No, it's not. for example
   Assume that file1 should be loaded to p1 and p2,
   and file2 should be loaded to p2 and p3. 
   
   both file1 and file2 contain data in p1 and p3. so if we set max_filter_ratio to 0, this load
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373994489
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java
 ##########
 @@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   It ok, I check it, it does no use at all now. and the old load job are gone before Doris version 0.11. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373987967
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
 
 Review comment:
   No, it's not. for example
   Assume that file1 should be loaded to p1 and p2,
   and file2 should be loaded to p2 and p3. 
   
   both file1 and file2 contain data half in p1 and half in p3. so if we set max_filter_ratio to 0.5, this load
   should succeed.
   
   In this case, if we do the aggregation, the result should be:
   
   file1 to p1
   file2 to p3  
   file1+file2 to p2
   
   and if max_filter_ratio still be 0.5, the load will be failed.
   assume each file has 10 lines:
   [file1 to p1] results in, eg, 5 lines ok, 5 lines error.
   [file2 to p3] results in 5 lines ok, 5 lines error.
   [file1+file2 to p2] reuslts in 0 line ok, 20 lines error.
   
   so the total error line is 30, and total line is 40, 30/40 > 0.5
   
   
   
   
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373994489
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java
 ##########
 @@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   It ok, I checked it, it does no use at all now. and the old load job are gone before Doris version 0.11. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373994889
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
+ *  So we have to let user decide what to do.
+ */
+public class BrokerFileGroupAggInfo implements Writable {
+    private static final Logger LOG = LogManager.getLogger(BrokerFileGroupAggInfo.class);
+
+    private Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+    // auxiliary structure, tbl id -> set of partition ids.
+    // used to exam the overlapping partitions of same table.
+    private Map<Long, Set<Long>> tableIdToPartitioIds = Maps.newHashMap();
+
+    // this inner class This class is used to distinguish different combinations of table and partitions
+    public static class FileGroupAggKey {
+        private long tableId;
+        private Set<Long> partitionIds; // empty means partition is not specified
+
+        public FileGroupAggKey(long tableId, List<Long> partitionIds) {
+            this.tableId = tableId;
+            if (partitionIds != null) {
+                this.partitionIds = Sets.newHashSet(partitionIds);
+            } else {
+                this.partitionIds = Sets.newHashSet();
+            }
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public Set<Long> getPartitionIds() {
+            return partitionIds;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof FileGroupAggKey)) {
+                return false;
+            }
+
+            FileGroupAggKey other = (FileGroupAggKey) obj;
+            return other.tableId == this.tableId && other.partitionIds.equals(this.partitionIds);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, partitionIds);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[").append(tableId).append(": ").append(partitionIds).append("]");
+            return sb.toString();
+        }
+    }
+
+    public BrokerFileGroupAggInfo() {
+
+    }
+
+    public void addFileGroup(BrokerFileGroup fileGroup) throws DdlException {
+        FileGroupAggKey fileGroupAggKey = new FileGroupAggKey(fileGroup.getTableId(), fileGroup.getPartitionIds());
+        List<BrokerFileGroup> fileGroupList = aggKeyToFileGroups.get(fileGroupAggKey);
+        if (fileGroupList == null) {
+            // check if there are overlapping partitions of same table
+            if (tableIdToPartitioIds.containsKey(fileGroup.getTableId()) 
+                    && tableIdToPartitioIds.get(fileGroup.getTableId()).stream().anyMatch(id -> fileGroup.getPartitionIds().contains(id))) {
+                throw new DdlException("There are overlapping partitions of same table in data descrition of load job stmt");
+            }
+            
+            fileGroupList = Lists.newArrayList();
+            aggKeyToFileGroups.put(fileGroupAggKey, fileGroupList);
+        }
+        // exist, aggregate them
+        fileGroupList.add(fileGroup);
+
+        // update tableIdToPartitioIds
+        Set<Long> partitionIds = tableIdToPartitioIds.get(fileGroup.getTableId());
+        if (partitionIds == null) {
+            partitionIds = Sets.newHashSet();
+            tableIdToPartitioIds.put(fileGroup.getTableId(), partitionIds);
+        }
+        if (fileGroup.getPartitionIds() != null) {
+            partitionIds.addAll(fileGroup.getPartitionIds());
+        }
+    }
+
+    public Set<Long> getAllTableIds() {
+        return aggKeyToFileGroups.keySet().stream().map(k -> k.tableId).collect(Collectors.toSet());
+    }
+
+    public Map<FileGroupAggKey, List<BrokerFileGroup>> getAggKeyToFileGroups() {
+        return aggKeyToFileGroups;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(aggKeyToFileGroups);
+        return sb.toString();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        // The pull load source info doesn't need to be persisted.
+        // It will be recreated by origin stmt in prepare of load job.
+        // write 0 just for compatibility
+        out.writeInt(0);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        in.readInt();
 
 Review comment:
   It won't, we already solve this in version 0.11

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373994069
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
+ *  So we have to let user decide what to do.
+ */
+public class BrokerFileGroupAggInfo implements Writable {
+    private static final Logger LOG = LogManager.getLogger(BrokerFileGroupAggInfo.class);
+
+    private Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+    // auxiliary structure, tbl id -> set of partition ids.
+    // used to exam the overlapping partitions of same table.
+    private Map<Long, Set<Long>> tableIdToPartitioIds = Maps.newHashMap();
+
+    // this inner class This class is used to distinguish different combinations of table and partitions
+    public static class FileGroupAggKey {
+        private long tableId;
+        private Set<Long> partitionIds; // empty means partition is not specified
+
+        public FileGroupAggKey(long tableId, List<Long> partitionIds) {
+            this.tableId = tableId;
+            if (partitionIds != null) {
+                this.partitionIds = Sets.newHashSet(partitionIds);
+            } else {
+                this.partitionIds = Sets.newHashSet();
+            }
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public Set<Long> getPartitionIds() {
+            return partitionIds;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof FileGroupAggKey)) {
+                return false;
+            }
+
+            FileGroupAggKey other = (FileGroupAggKey) obj;
+            return other.tableId == this.tableId && other.partitionIds.equals(this.partitionIds);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, partitionIds);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[").append(tableId).append(": ").append(partitionIds).append("]");
+            return sb.toString();
+        }
+    }
+
+    public BrokerFileGroupAggInfo() {
 
 Review comment:
   OK

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373959142
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
+ *  So we have to let user decide what to do.
+ */
+public class BrokerFileGroupAggInfo implements Writable {
+    private static final Logger LOG = LogManager.getLogger(BrokerFileGroupAggInfo.class);
+
+    private Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+    // auxiliary structure, tbl id -> set of partition ids.
+    // used to exam the overlapping partitions of same table.
+    private Map<Long, Set<Long>> tableIdToPartitioIds = Maps.newHashMap();
+
+    // this inner class This class is used to distinguish different combinations of table and partitions
+    public static class FileGroupAggKey {
+        private long tableId;
+        private Set<Long> partitionIds; // empty means partition is not specified
+
+        public FileGroupAggKey(long tableId, List<Long> partitionIds) {
+            this.tableId = tableId;
+            if (partitionIds != null) {
+                this.partitionIds = Sets.newHashSet(partitionIds);
+            } else {
+                this.partitionIds = Sets.newHashSet();
+            }
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public Set<Long> getPartitionIds() {
+            return partitionIds;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof FileGroupAggKey)) {
+                return false;
+            }
+
+            FileGroupAggKey other = (FileGroupAggKey) obj;
+            return other.tableId == this.tableId && other.partitionIds.equals(this.partitionIds);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, partitionIds);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[").append(tableId).append(": ").append(partitionIds).append("]");
+            return sb.toString();
+        }
+    }
+
+    public BrokerFileGroupAggInfo() {
 
 Review comment:
   It can be omitted.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on issue #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on issue #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#issuecomment-581294649
 
 
   The `PullLoadSourceInfo` has been consistent in some old version. I am afraid of the compatibility problems. Same as 'PullLoadPendingTask'

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 edited a comment on issue #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 edited a comment on issue #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#issuecomment-581294649
 
 
   The `PullLoadSourceInfo` has been consistent in some old version. I am afraid of the compatibility and persistent problems. Same as 'PullLoadPendingTask'.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r374049811
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -224,7 +224,15 @@ public void write(DataOutput out) throws IOException {
     }
 
     public void readFields(DataInput in) throws IOException {
-        in.readInt();
+        int mapSize = in.readInt();
+        // just for compatibility, the following read objects are useless
 
 Review comment:
   A warning log is better ~

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373913480
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
 ##########
 @@ -77,8 +77,8 @@
     // filter the data which has been conformed
     private Expr whereExpr;
 
-    // Used for recovery from edit log
-    private BrokerFileGroup() {
 
 Review comment:
   Private contructor could be mocked also.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on issue #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on issue #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#issuecomment-581293682
 
 
   If the use has two different partition info with the same table such as  file1->p1,p3, file2->p1,p2 , user should submit the load job such as file1->p1 , file1->p3, file2->p1, file2->p2.
   In this situation, there are four loading task instead of two of them. 
   Why don't you use the old struct such as Map<tableId, List<LoadInfo>>?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373987967
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
 
 Review comment:
   No, it's not. for example
   Assume that file1 should be loaded to p1 and p2,
   and file2 should be loaded to p2 and p3. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373964722
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/PullLoadSourceInfo.java
 ##########
 @@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
 
 Review comment:
   This class has been consistent in the old version. Maybe you should keep it .

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373967334
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
+ *  So we have to let user decide what to do.
+ */
+public class BrokerFileGroupAggInfo implements Writable {
+    private static final Logger LOG = LogManager.getLogger(BrokerFileGroupAggInfo.class);
+
+    private Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
 
 Review comment:
   Maybe Map<tableId, List<BrokerFileGroup>> is useful.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373915552
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
 
 Review comment:
   The max filter ratio belongs to the whole broker load instead of single data desc . So it does not change the "max_filter_ratio" when the system aggregate data desc automatically. Right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373978674
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java
 ##########
 @@ -77,8 +77,8 @@
     // filter the data which has been conformed
     private Expr whereExpr;
 
-    // Used for recovery from edit log
-    private BrokerFileGroup() {
 
 Review comment:
   OK

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373956247
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
+ *  So we have to let user decide what to do.
+ */
+public class BrokerFileGroupAggInfo implements Writable {
+    private static final Logger LOG = LogManager.getLogger(BrokerFileGroupAggInfo.class);
+
+    private Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+    // auxiliary structure, tbl id -> set of partition ids.
+    // used to exam the overlapping partitions of same table.
+    private Map<Long, Set<Long>> tableIdToPartitioIds = Maps.newHashMap();
+
+    // this inner class This class is used to distinguish different combinations of table and partitions
 
 Review comment:
   Maybe public class is better.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
morningman merged pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on a change in pull request #2824: [Load] Fix bug of wrong file group aggregation when handling broker load job
URL: https://github.com/apache/incubator-doris/pull/2824#discussion_r373966478
 
 

 ##########
 File path: fe/src/main/java/org/apache/doris/load/BrokerFileGroupAggInfo.java
 ##########
 @@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load;
+
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Writable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/*
+ * This class is mainly used to aggregate information of multiple DataDescriptors.
+ * When the table name and specified partitions in the two DataDescriptors are same,
+ * the BrokerFileGroup information corresponding to the two DataDescriptors will be aggregated together.
+ * eg1:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ *
+ *  will be aggregated together, because they have same table name and specified partitions
+ *  =>
+ *  FileGroupAggKey(tbl1, [p1, p2]) => List(file1, file2);
+ * 
+ * eg2:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ * 
+ *  will NOT be aggregated together, because they have same table name but different specified partitions
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file2);
+ * 
+ * eg3:
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1, p2)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2, p3)
+ * 
+ *  will throw an Exception, because there is an overlap partition(p2) between 2 data descriptions. And we
+ *  currently not allow this. You can rewrite the data descriptions like this:
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p1)
+ * 
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p3) 
+ *  
+ *  and
+ * 
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file1")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2) 
+ *  
+ *  and
+ *  
+ *  DATA INFILE("hdfs://hdfs_host:hdfs_port/input/file2")
+ *  INTO TABLE `tbl1`
+ *  PARTITION (p2)
+ *  
+ *  they will be aggregate like:
+ *  FileGroupAggKey(tbl1, [p1]) => List(file1);
+ *  FileGroupAggKey(tbl1, [p3]) => List(file2);
+ *  FileGroupAggKey(tbl1, [p2]) => List(file1, file2);
+ *  
+ *  Although this transformation can be done automatically by system, but it change the "max_filter_ratio".
+ *  So we have to let user decide what to do.
+ */
+public class BrokerFileGroupAggInfo implements Writable {
+    private static final Logger LOG = LogManager.getLogger(BrokerFileGroupAggInfo.class);
+
+    private Map<FileGroupAggKey, List<BrokerFileGroup>> aggKeyToFileGroups = Maps.newHashMap();
+    // auxiliary structure, tbl id -> set of partition ids.
+    // used to exam the overlapping partitions of same table.
+    private Map<Long, Set<Long>> tableIdToPartitioIds = Maps.newHashMap();
+
+    // this inner class This class is used to distinguish different combinations of table and partitions
+    public static class FileGroupAggKey {
+        private long tableId;
+        private Set<Long> partitionIds; // empty means partition is not specified
+
+        public FileGroupAggKey(long tableId, List<Long> partitionIds) {
+            this.tableId = tableId;
+            if (partitionIds != null) {
+                this.partitionIds = Sets.newHashSet(partitionIds);
+            } else {
+                this.partitionIds = Sets.newHashSet();
+            }
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public Set<Long> getPartitionIds() {
+            return partitionIds;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof FileGroupAggKey)) {
+                return false;
+            }
+
+            FileGroupAggKey other = (FileGroupAggKey) obj;
+            return other.tableId == this.tableId && other.partitionIds.equals(this.partitionIds);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, partitionIds);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("[").append(tableId).append(": ").append(partitionIds).append("]");
+            return sb.toString();
+        }
+    }
+
+    public BrokerFileGroupAggInfo() {
+
+    }
+
+    public void addFileGroup(BrokerFileGroup fileGroup) throws DdlException {
+        FileGroupAggKey fileGroupAggKey = new FileGroupAggKey(fileGroup.getTableId(), fileGroup.getPartitionIds());
+        List<BrokerFileGroup> fileGroupList = aggKeyToFileGroups.get(fileGroupAggKey);
+        if (fileGroupList == null) {
+            // check if there are overlapping partitions of same table
+            if (tableIdToPartitioIds.containsKey(fileGroup.getTableId()) 
+                    && tableIdToPartitioIds.get(fileGroup.getTableId()).stream().anyMatch(id -> fileGroup.getPartitionIds().contains(id))) {
+                throw new DdlException("There are overlapping partitions of same table in data descrition of load job stmt");
+            }
+            
+            fileGroupList = Lists.newArrayList();
+            aggKeyToFileGroups.put(fileGroupAggKey, fileGroupList);
+        }
+        // exist, aggregate them
+        fileGroupList.add(fileGroup);
+
+        // update tableIdToPartitioIds
+        Set<Long> partitionIds = tableIdToPartitioIds.get(fileGroup.getTableId());
+        if (partitionIds == null) {
+            partitionIds = Sets.newHashSet();
+            tableIdToPartitioIds.put(fileGroup.getTableId(), partitionIds);
+        }
+        if (fileGroup.getPartitionIds() != null) {
+            partitionIds.addAll(fileGroup.getPartitionIds());
+        }
+    }
+
+    public Set<Long> getAllTableIds() {
+        return aggKeyToFileGroups.keySet().stream().map(k -> k.tableId).collect(Collectors.toSet());
+    }
+
+    public Map<FileGroupAggKey, List<BrokerFileGroup>> getAggKeyToFileGroups() {
+        return aggKeyToFileGroups;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(aggKeyToFileGroups);
+        return sb.toString();
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        // The pull load source info doesn't need to be persisted.
+        // It will be recreated by origin stmt in prepare of load job.
+        // write 0 just for compatibility
+        out.writeInt(0);
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        in.readInt();
 
 Review comment:
   Maybe in is a PullLoadSourceInfo??

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org