You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by "aturoczy (via GitHub)" <gi...@apache.org> on 2023/05/18 13:30:23 UTC

[GitHub] [hive] aturoczy commented on a diff in pull request #4336: HIVE-27357: Fix CustomPartitionVertex.getBucketSplitMapForPath()

aturoczy commented on code in PR #4336:
URL: https://github.com/apache/hive/pull/4336#discussion_r1197819053


##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java:
##########
@@ -533,83 +536,87 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(String inputName,
       Map<String, Set<FileSplit>> pathFileSplitsMap) {
 
+    boolean isSMBJoin = numInputsAffectingRootInputSpecUpdate != 1;
+    boolean isMainWork = (mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0);
+    Preconditions.checkState(
+        (isMainWork || (isSMBJoin && inputToBucketMap != null &&  inputToBucketMap.containsKey(inputName))),
+        "CustomPartitionVertex.inputToBucketMap is not defined for " + inputName);
+    int inputBucketSize = isMainWork ? numBuckets : inputToBucketMap.get(inputName);
 
-    Multimap<Integer, InputSplit> bucketToInitialSplitMap =
-        ArrayListMultimap.create();
+    Multimap<Integer, InputSplit> bucketToSplitMap = ArrayListMultimap.create();
 
     boolean fallback = false;
-    Map<Integer, Integer> bucketIds = new HashMap<>();
     for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       // Extract the buckedID from pathFilesMap, this is more accurate method,
       // however. it may not work in certain cases where buckets are named
       // after files used while loading data. In such case, fallback to old
       // potential inaccurate method.
       // The accepted file names are such as 000000_0, 000001_0_copy_1.
-      String bucketIdStr =
-              Utilities.getBucketFileNameFromPathSubString(entry.getKey());
+      String bucketIdStr = Utilities.getBucketFileNameFromPathSubString(entry.getKey());
       int bucketId = Utilities.getBucketIdFromFile(bucketIdStr);
       if (bucketId == -1) {
         fallback = true;
-        LOG.info("Fallback to using older sort based logic to assign " +
-                "buckets to splits.");
-        bucketIds.clear();
+        LOG.info("Fallback to using older sort based logic to assign buckets to splits.");
+        bucketToSplitMap.clear();
         break;
       }
-      // Make sure the bucketId is at max the numBuckets
-      bucketId = bucketId % numBuckets;
-      bucketIds.put(bucketId, bucketId);
+
+      // Utilities.getBucketIdFromFile() returns negative value only if it fails to retrieve bucketID.
+      Preconditions.checkState(bucketId >= 0);
+
+      if (bucketId >= inputBucketSize) {
+        int newBucketId = bucketId % inputBucketSize;
+        LOG.info("The bucketID" + bucketId + " for file " + entry.getKey() + " is not acceptable. " +
+            "The bucket size of input " + inputName + " is " + inputBucketSize + ". " +
+            "Use " + newBucketId + " instead.");
+        bucketId = newBucketId;
+      }
+
       for (FileSplit fsplit : entry.getValue()) {
-        bucketToInitialSplitMap.put(bucketId, fsplit);
+        bucketToSplitMap.put(bucketId, fsplit);
       }
     }
 
-    int bucketNum = 0;
     if (fallback) {
       // This is the old logic which assumes that the filenames are sorted in
       // alphanumeric order and mapped to appropriate bucket number.
+      int bucketNum = 0;
       for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
-        int bucketId = bucketNum % numBuckets;
         for (FileSplit fsplit : entry.getValue()) {
-          bucketToInitialSplitMap.put(bucketId, fsplit);
+          bucketToSplitMap.put(bucketNum, fsplit);
+        }
+        bucketNum = bucketNum + 1;

Review Comment:
   Optional: but bucketName++; would be much more elegant. but this is just style.



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java:
##########
@@ -533,83 +536,87 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(String inputName,
       Map<String, Set<FileSplit>> pathFileSplitsMap) {
 
+    boolean isSMBJoin = numInputsAffectingRootInputSpecUpdate != 1;
+    boolean isMainWork = (mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0);
+    Preconditions.checkState(
+        (isMainWork || (isSMBJoin && inputToBucketMap != null &&  inputToBucketMap.containsKey(inputName))),
+        "CustomPartitionVertex.inputToBucketMap is not defined for " + inputName);
+    int inputBucketSize = isMainWork ? numBuckets : inputToBucketMap.get(inputName);
 
-    Multimap<Integer, InputSplit> bucketToInitialSplitMap =
-        ArrayListMultimap.create();
+    Multimap<Integer, InputSplit> bucketToSplitMap = ArrayListMultimap.create();
 
     boolean fallback = false;
-    Map<Integer, Integer> bucketIds = new HashMap<>();
     for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       // Extract the buckedID from pathFilesMap, this is more accurate method,
       // however. it may not work in certain cases where buckets are named
       // after files used while loading data. In such case, fallback to old
       // potential inaccurate method.
       // The accepted file names are such as 000000_0, 000001_0_copy_1.
-      String bucketIdStr =
-              Utilities.getBucketFileNameFromPathSubString(entry.getKey());
+      String bucketIdStr = Utilities.getBucketFileNameFromPathSubString(entry.getKey());
       int bucketId = Utilities.getBucketIdFromFile(bucketIdStr);
       if (bucketId == -1) {
         fallback = true;
-        LOG.info("Fallback to using older sort based logic to assign " +
-                "buckets to splits.");
-        bucketIds.clear();
+        LOG.info("Fallback to using older sort based logic to assign buckets to splits.");
+        bucketToSplitMap.clear();
         break;
       }
-      // Make sure the bucketId is at max the numBuckets
-      bucketId = bucketId % numBuckets;
-      bucketIds.put(bucketId, bucketId);
+
+      // Utilities.getBucketIdFromFile() returns negative value only if it fails to retrieve bucketID.
+      Preconditions.checkState(bucketId >= 0);
+
+      if (bucketId >= inputBucketSize) {
+        int newBucketId = bucketId % inputBucketSize;
+        LOG.info("The bucketID" + bucketId + " for file " + entry.getKey() + " is not acceptable. " +

Review Comment:
   After bucketID need a backspace



##########
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java:
##########
@@ -533,83 +536,87 @@ private FileSplit getFileSplitFromEvent(InputDataInformationEvent event) throws
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(String inputName,
       Map<String, Set<FileSplit>> pathFileSplitsMap) {
 
+    boolean isSMBJoin = numInputsAffectingRootInputSpecUpdate != 1;
+    boolean isMainWork = (mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0);
+    Preconditions.checkState(
+        (isMainWork || (isSMBJoin && inputToBucketMap != null &&  inputToBucketMap.containsKey(inputName))),
+        "CustomPartitionVertex.inputToBucketMap is not defined for " + inputName);
+    int inputBucketSize = isMainWork ? numBuckets : inputToBucketMap.get(inputName);
 
-    Multimap<Integer, InputSplit> bucketToInitialSplitMap =
-        ArrayListMultimap.create();
+    Multimap<Integer, InputSplit> bucketToSplitMap = ArrayListMultimap.create();
 
     boolean fallback = false;
-    Map<Integer, Integer> bucketIds = new HashMap<>();
     for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       // Extract the buckedID from pathFilesMap, this is more accurate method,
       // however. it may not work in certain cases where buckets are named
       // after files used while loading data. In such case, fallback to old
       // potential inaccurate method.
       // The accepted file names are such as 000000_0, 000001_0_copy_1.
-      String bucketIdStr =
-              Utilities.getBucketFileNameFromPathSubString(entry.getKey());
+      String bucketIdStr = Utilities.getBucketFileNameFromPathSubString(entry.getKey());
       int bucketId = Utilities.getBucketIdFromFile(bucketIdStr);
       if (bucketId == -1) {
         fallback = true;
-        LOG.info("Fallback to using older sort based logic to assign " +
-                "buckets to splits.");
-        bucketIds.clear();
+        LOG.info("Fallback to using older sort based logic to assign buckets to splits.");
+        bucketToSplitMap.clear();
         break;
       }
-      // Make sure the bucketId is at max the numBuckets
-      bucketId = bucketId % numBuckets;
-      bucketIds.put(bucketId, bucketId);
+
+      // Utilities.getBucketIdFromFile() returns negative value only if it fails to retrieve bucketID.
+      Preconditions.checkState(bucketId >= 0);
+
+      if (bucketId >= inputBucketSize) {
+        int newBucketId = bucketId % inputBucketSize;
+        LOG.info("The bucketID" + bucketId + " for file " + entry.getKey() + " is not acceptable. " +
+            "The bucket size of input " + inputName + " is " + inputBucketSize + ". " +
+            "Use " + newBucketId + " instead.");
+        bucketId = newBucketId;
+      }
+
       for (FileSplit fsplit : entry.getValue()) {
-        bucketToInitialSplitMap.put(bucketId, fsplit);
+        bucketToSplitMap.put(bucketId, fsplit);
       }
     }
 
-    int bucketNum = 0;
     if (fallback) {
       // This is the old logic which assumes that the filenames are sorted in
       // alphanumeric order and mapped to appropriate bucket number.
+      int bucketNum = 0;
       for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
-        int bucketId = bucketNum % numBuckets;
         for (FileSplit fsplit : entry.getValue()) {
-          bucketToInitialSplitMap.put(bucketId, fsplit);
+          bucketToSplitMap.put(bucketNum, fsplit);
+        }
+        bucketNum = bucketNum + 1;
+
+        if (bucketNum == inputBucketSize) {

Review Comment:
   So if the bucketNum and the inputBucket are 10 the bucket number will be 0. Is this right? 
   Maybe I missed but  I haven't seen a test for this case. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org