You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/14 10:49:01 UTC
git commit: TAJO-181: Improvement of disk scheduler policy for
straggler disk. (hyoungjunkim via hyunsik)
Updated Branches:
refs/heads/master f664e0ccf -> 3915d85e9
TAJO-181: Improvement of disk scheduler policy for straggler disk. (hyoungjunkim via hyunsik)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/3915d85e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/3915d85e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/3915d85e
Branch: refs/heads/master
Commit: 3915d85e98f00c629e064af26dd3551f29c52437
Parents: f664e0c
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sat Sep 14 17:46:58 2013 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sat Sep 14 17:46:58 2013 +0900
----------------------------------------------------------------------
.../apache/tajo/storage/v2/ScanScheduler.java | 47 ++++++++++++--------
1 file changed, 28 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3915d85e/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
index eca590f..12ca6e3 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
@@ -18,27 +18,22 @@
package org.apache.tajo.storage.v2;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.storage.v2.StorageManagerV2.StorgaeManagerContext;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class ScanScheduler extends Thread {
private static final Log LOG = LogFactory.getLog(ScanScheduler.class);
- private Object scanQueueLock;
+ private final Object scanQueueLock;
private StorgaeManagerContext context;
private Map<String, FileScannerV2> requestMap = new HashMap<String, FileScannerV2>();
- private Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
+ private final Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
private Map<Integer, DiskDeviceInfo> diskDeviceInfoMap = new HashMap<Integer, DiskDeviceInfo>();
@@ -80,19 +75,21 @@ public class ScanScheduler extends Thread {
} else {
int diskId = fileScannerV2.getDiskId();
- //LOG.info("Scan Scheduled:" + diskId + "," + fileScannerV2.toString());
-
- if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
- diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
- if(diskId < 0) {
-
- diskId = findMinQueueDisk();
+ int emptyDiskId = findEmptyDisk();
+ if(emptyDiskId < 0) {
+ if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
+ diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
if(diskId < 0) {
- diskId = rand.nextInt(diskDeviceInfoMap.size());
+
+ diskId = findMinQueueDisk();
+ if(diskId < 0) {
+ diskId = rand.nextInt(diskDeviceInfoMap.size());
+ }
}
}
+ } else {
+ diskId = emptyDiskId;
}
-
synchronized(diskFileScannerMap) {
requestMap.put(fileScannerV2.getId(), fileScannerV2);
DiskFileScanScheduler diskScheduler = diskFileScannerMap.get(diskId);
@@ -103,6 +100,18 @@ public class ScanScheduler extends Thread {
}
}
+ private int findEmptyDisk() {
+ synchronized(diskFileScannerMap) {
+ for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+ int queueSize = eachDiskScanner.getTotalQueueSize();
+ if(queueSize == 0) {
+ return eachDiskScanner.getDiskId();
+ }
+ }
+ return -1;
+ }
+ }
+
private int findMinQueueDisk() {
int minValue = Integer.MAX_VALUE;
int minId = -1;