You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/14 10:49:01 UTC

git commit: TAJO-181: Improvement of disk scheduler policy for straggler disk. (hyoungjunkim via hyunsik)

Updated Branches:
  refs/heads/master f664e0ccf -> 3915d85e9


TAJO-181: Improvement of disk scheduler policy for straggler disk. (hyoungjunkim via hyunsik)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3915d85e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3915d85e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3915d85e

Branch: refs/heads/master
Commit: 3915d85e98f00c629e064af26dd3551f29c52437
Parents: f664e0c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Sep 14 17:46:58 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Sep 14 17:46:58 2013 +0900

----------------------------------------------------------------------
 .../apache/tajo/storage/v2/ScanScheduler.java   | 47 ++++++++++++--------
 1 file changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3915d85e/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
index eca590f..12ca6e3 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
@@ -18,27 +18,22 @@
 
 package org.apache.tajo.storage.v2;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.storage.v2.StorageManagerV2.StorgaeManagerContext;
 
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 public class ScanScheduler extends Thread {
   private static final Log LOG = LogFactory.getLog(ScanScheduler.class);
 
-  private Object scanQueueLock;
+  private final Object scanQueueLock;
   private StorgaeManagerContext context;
 
   private Map<String, FileScannerV2> requestMap = new HashMap<String, FileScannerV2>();
 
-  private Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
+  private final Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
 
   private Map<Integer, DiskDeviceInfo> diskDeviceInfoMap = new HashMap<Integer, DiskDeviceInfo>();
 
@@ -80,19 +75,21 @@ public class ScanScheduler extends Thread {
         } else {
           int diskId = fileScannerV2.getDiskId();
 
-          //LOG.info("Scan Scheduled:" + diskId + "," + fileScannerV2.toString());
-
-          if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
-            diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
-            if(diskId < 0) {
-
-              diskId = findMinQueueDisk();
+          int emptyDiskId = findEmptyDisk();
+          if(emptyDiskId < 0) {
+            if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
+              diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
               if(diskId < 0) {
-                diskId = rand.nextInt(diskDeviceInfoMap.size());
+
+                diskId = findMinQueueDisk();
+                if(diskId < 0) {
+                  diskId = rand.nextInt(diskDeviceInfoMap.size());
+                }
               }
             }
+          } else {
+            diskId = emptyDiskId;
           }
-
           synchronized(diskFileScannerMap) {
             requestMap.put(fileScannerV2.getId(), fileScannerV2);
             DiskFileScanScheduler diskScheduler = diskFileScannerMap.get(diskId);
@@ -103,6 +100,18 @@ public class ScanScheduler extends Thread {
     }
   }
 
+  private int findEmptyDisk() {
+    synchronized(diskFileScannerMap) {
+      for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+        int queueSize = eachDiskScanner.getTotalQueueSize();
+        if(queueSize == 0) {
+          return eachDiskScanner.getDiskId();
+        }
+      }
+      return -1;
+    }
+  }
+  
   private int findMinQueueDisk() {
     int minValue = Integer.MAX_VALUE;
     int minId = -1;