You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/08/26 06:46:20 UTC

git commit: TAJO-84: Task scheduling with considering disk load balance. (jinho)

Updated Branches:
  refs/heads/master df09fd22c -> 8b8b6683d


TAJO-84: Task scheduling with considering disk load balance. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/8b8b6683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/8b8b6683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/8b8b6683

Branch: refs/heads/master
Commit: 8b8b6683d86ca7d3362ad2c16644f6656d4e129d
Parents: df09fd2
Author: jinossy <ji...@gmail.com>
Authored: Mon Aug 26 13:45:26 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Aug 26 13:45:26 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../tajo/engine/json/FragmentDeserializer.java  |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |   2 +-
 .../apache/tajo/engine/query/ResultSetImpl.java |   6 +-
 .../apache/tajo/master/TaskSchedulerImpl.java   | 131 +++++++++++--
 .../tajo/master/event/TaskScheduleEvent.java    |  16 +-
 .../tajo/master/querymaster/QueryUnit.java      |  56 +++++-
 .../master/querymaster/QueryUnitAttempt.java    |   4 +-
 .../tajo/master/querymaster/Repartitioner.java  |   9 +-
 .../main/java/org/apache/tajo/worker/Task.java  |   2 +-
 .../planner/physical/TestPhysicalPlanner.java   |   4 +-
 .../org/apache/tajo/storage/TestFragment.java   |  13 +-
 .../org/apache/tajo/storage/TestRowFile.java    |   4 +-
 .../java/org/apache/tajo/storage/CSVFile.java   |   5 +
 .../java/org/apache/tajo/storage/Fragment.java  |  78 ++++++--
 .../org/apache/tajo/storage/MergeScanner.java   |   5 +
 .../java/org/apache/tajo/storage/RawFile.java   |   5 +
 .../java/org/apache/tajo/storage/RowFile.java   |   5 +
 .../java/org/apache/tajo/storage/Scanner.java   |   7 +
 .../org/apache/tajo/storage/StorageManager.java | 189 ++++++++++++++-----
 .../tajo/storage/json/FragmentDeserializer.java |   2 +-
 .../tajo/storage/rcfile/RCFileWrapper.java      |   5 +
 .../tajo/storage/trevni/TrevniScanner.java      |   5 +
 .../apache/tajo/storage/TestMergeScanner.java   |   6 +-
 .../org/apache/tajo/storage/TestStorages.java   |  10 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |  10 +-
 .../index/TestSingleCSVFileBSTIndex.java        |   4 +-
 .../java/org/apache/tajo/util/NetUtils.java     |  38 +++-
 28 files changed, 478 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f41d352..de7d746 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-84: Task scheduling with considering disk load balance. (jinho)
+
     TAJO-123: Clean up the logical plan's json format. (hyunsik)
 
     TAJO-129: Enable the constructor of NettyServerBase to take a service

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
index 6231c03..63dd1fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
@@ -47,7 +47,7 @@ public class FragmentDeserializer implements JsonDeserializer<Fragment> {
 				gson.fromJson(fragObj.get("path"), Path.class), 
 				meta, 
 				fragObj.get("startOffset").getAsLong(), 
-				fragObj.get("length").getAsLong(), null);
+				fragObj.get("length").getAsLong());
 		return fragment;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index dc42a71..474415e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -101,7 +101,7 @@ public class SeqScanExec extends PhysicalExec {
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    Tuple outTuple = new VTuple(outSchema.getColumnNum());
+    Tuple outTuple = new VTuple(outColumnNum);
 
     if (!plan.hasQual()) {
       if ((tuple = scanner.next()) != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index e0db8e1..808b910 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -133,8 +133,7 @@ public class ResultSetImpl implements ResultSet {
       if (files[i].getLen() == 0) {
         continue;
       }
-      fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l,
-          files[i].getLen(), null));
+      fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l, files[i].getLen()));
     }
     return fraglist;
   }
@@ -150,8 +149,7 @@ public class ResultSetImpl implements ResultSet {
       if (files[i].getLen() == 0) {
         continue;
       }
-      fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l,
-          files[i].getLen(), null));
+      fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l, files[i].getLen()));
     }
     return fraglist.toArray(new Fragment[fraglist.size()]);
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index fc776e3..c9702b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -34,15 +34,16 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.QueryMasterProtocol;
 import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
 import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
 import org.apache.tajo.master.event.TaskRequestEvent;
 import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
 import org.apache.tajo.master.event.TaskScheduleEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent;
 import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.TajoIdUtils;
 
 import java.net.URI;
@@ -248,28 +249,111 @@ public class TaskSchedulerImpl extends AbstractService
     }
   }
 
+  public static class TaskBlockLocation {
+    private HashMap<Integer, LinkedList<QueryUnitAttemptId>> unAssignedTaskMap =
+        new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
+    private HashMap<ContainerId, Integer> assignedContainerMap = new HashMap<ContainerId, Integer>();
+    private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, Integer>();
+    private String host;
+
+    public TaskBlockLocation(String host){
+      this.host = host;
+    }
+
+    public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId attemptId){
+      LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
+      if (list == null) {
+        list = new LinkedList<QueryUnitAttemptId>();
+        unAssignedTaskMap.put(volumeId, list);
+      }
+      list.add(attemptId);
+
+      if(!volumeUsageMap.containsKey(volumeId)) volumeUsageMap.put(volumeId, 0);
+    }
+
+    public LinkedList<QueryUnitAttemptId> getQueryUnitAttemptIdList(ContainerId containerId){
+      Integer volumeId;
+
+      if (!assignedContainerMap.containsKey(containerId)) {
+        volumeId = assignVolumeId();
+        assignedContainerMap.put(containerId, volumeId);
+      } else {
+        volumeId = assignedContainerMap.get(containerId);
+      }
+
+      LinkedList<QueryUnitAttemptId> list = null;
+      if (unAssignedTaskMap.size() >  0) {
+        int retry = unAssignedTaskMap.size();
+        do {
+          list = unAssignedTaskMap.get(volumeId);
+          if (list == null || list.size() == 0) {
+            //clean and reassign remaining volume
+            unAssignedTaskMap.remove(volumeId);
+            volumeUsageMap.remove(volumeId);
+            if (volumeId < 0) break; //  processed all block on disk
+
+            volumeId = assignVolumeId();
+            assignedContainerMap.put(containerId, volumeId);
+            retry--;
+          } else {
+            break;
+          }
+        } while (retry > 0);
+      }
+      return list;
+    }
+
+    public Integer assignVolumeId(){
+      Map.Entry<Integer, Integer> volumeEntry = null;
+
+      for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
+        if(volumeEntry == null) volumeEntry = entry;
+
+        if (volumeEntry.getValue() >= entry.getValue()) {
+          volumeEntry = entry;
+        }
+      }
+
+      if(volumeEntry != null){
+        volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
+        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", concurrency : "
+            + volumeUsageMap.get(volumeEntry.getKey()));
+        return volumeEntry.getKey();
+      } else {
+         return -1;  // processed all block on disk
+      }
+    }
+
+    public String getHost() {
+      return host;
+    }
+  }
+
   private class ScheduledRequests {
     private final HashSet<QueryUnitAttemptId> leafTasks = new HashSet<QueryUnitAttemptId>();
     private final HashSet<QueryUnitAttemptId> nonLeafTasks = new HashSet<QueryUnitAttemptId>();
-    private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksHostMapping =
-        new HashMap<String, LinkedList<QueryUnitAttemptId>>();
+    private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
     private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
         new HashMap<String, LinkedList<QueryUnitAttemptId>>();
 
     public void addLeafTask(TaskScheduleEvent event) {
-      for (String host : event.getHosts()) {
-        String hostName = NetUtils.normalizeHostName(host);
-        LinkedList<QueryUnitAttemptId> list = leafTasksHostMapping.get(hostName);
-        if (list == null) {
-          list = new LinkedList<QueryUnitAttemptId>();
-          leafTasksHostMapping.put(hostName, list);
+      List<QueryUnit.DataLocation> locations = event.getDataLocations();
+
+      for (QueryUnit.DataLocation location : locations) {
+        String host = location.getHost();
+
+        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
+        if (taskBlockLocation == null) {
+          taskBlockLocation = new TaskBlockLocation(host);
+          leafTaskHostMapping.put(host, taskBlockLocation);
         }
-        list.add(event.getAttemptId());
+        taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), event.getAttemptId());
+
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Added attempt req to host " + hostName);
+          LOG.debug("Added attempt req to host " + host);
         }
       }
-      for (String rack: event.getRacks()) {
+      for (String rack : event.getRacks()) {
         LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
         if (list == null) {
           list = new LinkedList<QueryUnitAttemptId>();
@@ -297,7 +381,6 @@ public class TaskSchedulerImpl extends AbstractService
     }
 
     public Set<QueryUnitAttemptId> AssignedRequest = new HashSet<QueryUnitAttemptId>();
-
     public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
       Iterator<TaskRequestEvent> it = taskRequests.iterator();
       LOG.info("Got task requests " + taskRequests.size());
@@ -306,14 +389,22 @@ public class TaskSchedulerImpl extends AbstractService
       while (it.hasNext() && leafTasks.size() > 0) {
         taskRequest = it.next();
         ContainerProxy container = context.getContainer(taskRequest.getContainerId());
-        String hostName = NetUtils.normalizeHostName(container.getTaskHostName());
+        String host = container.getTaskHostName();
 
         QueryUnitAttemptId attemptId = null;
+        LinkedList<QueryUnitAttemptId> list = null;
 
-        // local allocation
-        LinkedList<QueryUnitAttemptId> list = leafTasksHostMapping.get(hostName);
-        while(list != null && list.size() > 0) {
+        // local disk allocation
+        if(!leafTaskHostMapping.containsKey(host)){
+          host = NetUtils.normalizeHost(host);
+        }
+
+        TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
+        if (taskBlockLocation != null) {
+          list = taskBlockLocation.getQueryUnitAttemptIdList(taskRequest.getContainerId());
+        }
 
+        while (list != null && list.size() > 0) {
           QueryUnitAttemptId tId = list.removeFirst();
 
           if (leafTasks.contains(tId)) {
@@ -327,7 +418,7 @@ public class TaskSchedulerImpl extends AbstractService
 
         // rack allocation
         if (attemptId == null) {
-          String rack = RackResolver.resolve(hostName).getNetworkLocation();
+          String rack = RackResolver.resolve(host).getNetworkLocation();
           list = leafTasksRackMapping.get(rack);
           while(list != null && list.size() > 0) {
 
@@ -365,7 +456,7 @@ public class TaskSchedulerImpl extends AbstractService
 
           context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
               taskRequest.getContainerId(),
-              container.getTaskHostName(), container.getTaskPort()));
+              host, container.getTaskPort()));
           AssignedRequest.add(attemptId);
 
           totalAssigned++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
index 03b2aba..1f87356 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
@@ -19,23 +19,25 @@
 package org.apache.tajo.master.event;
 
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.querymaster.QueryUnit;
 
 import java.util.Arrays;
+import java.util.List;
 
 public class TaskScheduleEvent extends TaskSchedulerEvent {
   private final QueryUnitAttemptId attemptId;
   private final boolean isLeafQuery;
-  private final String[] hosts;
+  private final List<QueryUnit.DataLocation> dataLocations;
   private final String[] racks;
 
   public TaskScheduleEvent(final QueryUnitAttemptId attemptId,
                            final EventType eventType, boolean isLeafQuery,
-                           final String[] hosts,
+                           final List<QueryUnit.DataLocation> dataLocations,
                            final String[] racks) {
     super(eventType, attemptId.getSubQueryId());
     this.attemptId = attemptId;
     this.isLeafQuery = isLeafQuery;
-    this.hosts = hosts;
+    this.dataLocations = dataLocations;
     this.racks = racks;
   }
 
@@ -47,11 +49,11 @@ public class TaskScheduleEvent extends TaskSchedulerEvent {
     return this.isLeafQuery;
   }
 
-  public String [] getHosts() {
-    return this.hosts;
+  public List<QueryUnit.DataLocation> getDataLocations() {
+    return this.dataLocations;
   }
 
-  public String [] getRacks() {
+  public String[] getRacks() {
     return this.racks;
   }
 
@@ -60,7 +62,7 @@ public class TaskScheduleEvent extends TaskSchedulerEvent {
     return "TaskScheduleEvent{" +
         "attemptId=" + attemptId +
         ", isLeafQuery=" + isLeafQuery +
-        ", hosts=" + (hosts == null ? null : Arrays.asList(hosts)) +
+        ", hosts=" + (dataLocations == null ? null : dataLocations) +
         ", racks=" + (racks == null ? null : Arrays.asList(racks)) +
         '}';
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index cf32e16..69143c9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -59,7 +59,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
 	
   private List<Partition> partitions;
 	private TableStat stats;
-  private String [] dataLocations;
+  private List<DataLocation> dataLocations;
   private final boolean isLeafTask;
   private List<IntermediateEntry> intermediateData;
 
@@ -127,11 +127,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
     return this.isLeafTask;
   }
 
-  public void setDataLocations(String [] dataLocations) {
-    this.dataLocations = dataLocations;
+  public void setDataLocations(Fragment fragment) {
+    String[] hosts = fragment.getHosts();
+    int[] blockCount = fragment.getHostsBlockCount();
+    int[] volumeIds = fragment.getDiskIds();
+    this.dataLocations = new ArrayList<DataLocation>(hosts.length);
+
+    for (int i = 0; i < hosts.length; i++) {
+      this.dataLocations.add(new DataLocation(hosts[i], blockCount[i], volumeIds[i]));
+    }
   }
 
-  public String [] getDataLocations() {
+  public List<DataLocation> getDataLocations() {
     return this.dataLocations;
   }
 
@@ -171,16 +178,12 @@ public class QueryUnit implements EventHandler<TaskEvent> {
   @Deprecated
   public void setFragment(String tableId, Fragment fragment) {
     this.fragMap.put(tableId, fragment);
-    if (fragment.hasDataLocations()) {
-      setDataLocations(fragment.getDataLocations());
-    }
+    setDataLocations(fragment);
   }
 
   public void setFragment2(Fragment fragment) {
     this.fragMap.put(fragment.getName(), fragment);
-    if (fragment.hasDataLocations()) {
-      setDataLocations(fragment.getDataLocations());
-    }
+    setDataLocations(fragment);
   }
 	
 	public void addFetch(String tableId, String uri) throws URISyntaxException {
@@ -496,4 +499,37 @@ public class QueryUnit implements EventHandler<TaskEvent> {
       return pullHost + ":" + port;
     }
   }
+
+  public static class DataLocation {
+    private String host;
+    private int blockCount; // for Non-Splittable
+    private int volumeId;
+
+    public DataLocation(String host, int blockCount, int volumeId) {
+      this.host = host;
+      this.blockCount = blockCount;
+      this.volumeId = volumeId;
+    }
+
+    public String getHost() {
+      return host;
+    }
+
+    public int getBlockCount() {
+      return blockCount;
+    }
+
+    public int getVolumeId() {
+      return volumeId;
+    }
+
+    @Override
+    public String toString() {
+      return "DataLocation{" +
+          "host=" + host +
+          ", blocks=" + blockCount +
+          ", volumeId=" + volumeId +
+          '}';
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 5443858..add42aa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -201,8 +201,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
       if (taskAttempt.isLeafTask()
           && taskAttempt.getQueryUnit().getScanNodes().length == 1) {
         Set<String> racks = new HashSet<String>();
-        for (String host : taskAttempt.getQueryUnit().getDataLocations()) {
-          racks.add(RackResolver.resolve(host).getNetworkLocation());
+        for (QueryUnit.DataLocation location : taskAttempt.getQueryUnit().getDataLocations()) {
+          racks.add(RackResolver.resolve(location.getHost()).getNetworkLocation());
         }
 
         taskAttempt.eventHandler.handle(new TaskScheduleEvent(

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index ff590f3..cccfcba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -78,8 +78,7 @@ public class Repartitioner {
 
       if (scans[i].isLocal()) { // it only requires a dummy fragment.
         fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
-            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
-            0, 0, null);
+            CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV), 0, 0);
       } else {
         fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
                 tableDesc.getMeta(),
@@ -293,8 +292,7 @@ public class Repartitioner {
     TupleRange [] ranges = partitioner.partition(determinedTaskNum);
 
     Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
-        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
-        0, 0, null);
+        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),0, 0);
 
     List<String> basicFetchURIs = new ArrayList<String>();
 
@@ -391,8 +389,7 @@ public class Repartitioner {
     }
 
     Fragment frag = new Fragment(scan.getTableId(), tablePath,
-        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
-        0, 0, null);
+        CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV), 0, 0);
 
     Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
     Map<String, List<IntermediateEntry>> hashedByHost;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 559f29b..94f1662 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -460,7 +460,7 @@ public class Task {
       if (f.getLen() == 0) {
         continue;
       }
-      tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen(), null);
+      tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen());
       listTablets.add(tablet);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 271148f..dbb3862 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -469,7 +469,7 @@ public class TestPhysicalPlanner {
     Fragment [] fragments = new Fragment[list.length];
     int i = 0;
     for (FileStatus status : list) {
-      fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen(), null);
+      fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen());
     }
     Scanner scanner = new MergeScanner(conf, outputMeta,TUtil.newList(fragments));
     scanner.init();
@@ -527,7 +527,7 @@ public class TestPhysicalPlanner {
     Fragment [] fragments = new Fragment[list.length];
     int i = 0;
     for (FileStatus status : list) {
-      fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen(), null);
+      fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen());
     }
     Scanner scanner = new MergeScanner(conf, outputMeta,TUtil.newList(fragments));
     scanner.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
index 1382db3..9245b56 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
@@ -48,8 +48,7 @@ public class TestFragment {
 
   @Test
   public final void testGetAndSetFields() {    
-    Fragment fragment1 = new Fragment("table1_1", new Path("/table0"),
-        meta1, 0, 500, null);
+    Fragment fragment1 = new Fragment("table1_1", new Path("/table0"), meta1, 0, 500);
     fragment1.setDistCached();
 
     assertEquals("table1_1", fragment1.getName());
@@ -70,7 +69,7 @@ public class TestFragment {
 
   @Test
   public final void testTabletTabletProto() {
-    Fragment fragment0 = new Fragment("table1_1", new Path("/table0"), meta1, 0, 500, null);
+    Fragment fragment0 = new Fragment("table1_1", new Path("/table0"), meta1, 0, 500);
     
     Fragment fragment1 = new Fragment(fragment0.getProto());
     assertEquals("table1_1", fragment1.getName());
@@ -93,9 +92,7 @@ public class TestFragment {
     final int num = 10;
     Fragment [] tablets = new Fragment[num];
     for (int i = num - 1; i >= 0; i--) {
-      tablets[i]
-          = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, i * 500, 
-              (i+1) * 500, null);
+      tablets[i] = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, i * 500, (i+1) * 500);
     }
     
     Arrays.sort(tablets);
@@ -110,9 +107,7 @@ public class TestFragment {
     final int num = 1860;
     Fragment [] tablets = new Fragment[num];
     for (int i = num - 1; i >= 0; i--) {
-      tablets[i]
-          = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, (long)i * 6553500,
-          (long)(i+1) * 6553500, null);
+      tablets[i] = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, (long)i * 6553500, (long)(i+1) * 6553500);
     }
 
     SortedSet sortedSet = Sets.newTreeSet();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 2d67690..0b85fdf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -113,7 +113,7 @@ public class TestRowFile {
     TableProto proto = (TableProto) FileUtil.loadProto(
         util.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
     meta = new TableMetaImpl(proto);
-    Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen(), null);
+    Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen());
 
     int tupleCnt = 0;
     start = System.currentTimeMillis();
@@ -136,7 +136,7 @@ public class TestRowFile {
 
     for (int i = 0; i < 13; i++) {
       System.out.println("range: " + fileStart + ", " + fileLen);
-      fragment = new Fragment("test.tbl", dataPath, meta, fileStart, fileLen, null);
+      fragment = new Fragment("test.tbl", dataPath, meta, fileStart, fileLen);
       scanner = new RowFile.RowFileScanner(conf, meta, fragment);
       scanner.init();
       while ((tuple=scanner.next()) != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index f0d9335..0f6ed41 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -460,5 +460,10 @@ public class CSVFile {
       }
       return this.tupleOffsets[currentIdx];
     }
+
+    @Override
+    public boolean isSplittable(){
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
index e070726..4f6dde1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
 import com.google.common.base.Objects;
 import com.google.gson.Gson;
 import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.catalog.*;
@@ -29,6 +30,9 @@ import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
 import org.apache.tajo.storage.json.StorageGsonHelper;
 import org.apache.tajo.util.TUtil;
 
+import java.io.IOException;
+import java.util.Arrays;
+
 public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject, GsonObject {
   protected FragmentProto.Builder builder = null;
 
@@ -39,21 +43,44 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
   @Expose private Long length; // required
   @Expose private boolean distCached = false; // optional
 
-  private String [] dataLocations;
+  private String[] hosts; // Datanode hostnames
+  private int[] hostsBlockCount; // list of block count of hosts
+  private int[] diskIds;
 
   public Fragment() {
     builder = FragmentProto.newBuilder();
   }
 
-  public Fragment(String tableName, Path uri, TableMeta meta, long start,
-      long length, String [] dataLocations) {
+  public Fragment(String tableName, Path uri, TableMeta meta, BlockLocation blockLocation, int[] diskIds) throws IOException {
+    this();
+    TableMeta newMeta = new TableMetaImpl(meta.getProto());
+    SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
+        .getSchema().getProto());
+    newMeta.setSchema(new Schema(newSchemaProto));
+    this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength());
+    this.hosts = blockLocation.getHosts();
+    this.diskIds = diskIds;
+  }
+
+  // Non splittable
+  public Fragment(String tableName, Path uri, TableMeta meta, long start, long length, String[] hosts, int[] hostsBlockCount) {
     this();
     TableMeta newMeta = new TableMetaImpl(meta.getProto());
     SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
         .getSchema().getProto());
     newMeta.setSchema(new Schema(newSchemaProto));
     this.set(tableName, uri, newMeta, start, length);
-    this.dataLocations = dataLocations;
+    this.hosts = hosts;
+    this.hostsBlockCount = hostsBlockCount;
+  }
+
+  public Fragment(String fragmentId, Path path, TableMeta meta, long start, long length) {
+    this();
+    TableMeta newMeta = new TableMetaImpl(meta.getProto());
+    SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(fragmentId, meta
+        .getSchema().getProto());
+    newMeta.setSchema(new Schema(newSchemaProto));
+    this.set(fragmentId, path, newMeta, start, length);
   }
 
   public Fragment(FragmentProto proto) {
@@ -66,14 +93,6 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
     }
   }
 
-  public boolean hasDataLocations() {
-    return this.dataLocations != null;
-  }
-
-  public String [] getDataLocations() {
-    return this.dataLocations;
-  }
-
   private void set(String tableName, Path path, TableMeta meta, long start,
       long length) {
     this.tableName = tableName;
@@ -83,6 +102,41 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
     this.length = length;
   }
 
+
+  /**
+   * Get the list of hosts (hostname) hosting this block
+   */
+  public String[] getHosts() {
+    if (hosts == null) {
+      this.hosts = new String[0];
+    }
+    return hosts;
+  }
+
+  /**
+   * Get the list of hosts block count
+   * if a fragment given multiple block, it returned 'host0:3, host1:1 ...'
+   */
+  public int[] getHostsBlockCount() {
+    if (hostsBlockCount == null) {
+      this.hostsBlockCount = new int[getHosts().length];
+      Arrays.fill(this.hostsBlockCount, 1);
+    }
+    return hostsBlockCount;
+  }
+
+  /**
+   * Get the list of Disk Ids
+   * Unknown disk is -1. Others 0 ~ N
+   */
+  public int[] getDiskIds() {
+    if (diskIds == null) {
+      this.diskIds = new int[getHosts().length];
+      Arrays.fill(this.diskIds, -1);
+    }
+    return diskIds;
+  }
+
   public String getName() {
     return this.tableName;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 1afd9d5..8b7ec67 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -107,4 +107,9 @@ public class MergeScanner implements Scanner {
   public Schema getSchema() {
     return meta.getSchema();
   }
+
+  @Override
+  public boolean isSplittable(){
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index b3f158b..c8127b1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -245,6 +245,11 @@ public class RawFile {
     public boolean isSelectable() {
       return false;
     }
+
+    @Override
+    public boolean isSplittable(){
+      return false;
+    }
   }
 
   public static class RawFileAppender extends FileAppender {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
index cd6dc00..6a3bcf1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -311,6 +311,11 @@ public class RowFile {
     public boolean isSelectable() {
       return false;
     }
+
+    @Override
+    public boolean isSplittable(){
+      return true;
+    }
   }
 
   public static class RowFileAppender extends FileAppender {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
index 4f5e7b1..6dca3f2 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -84,4 +84,11 @@ public interface Scanner extends SchemaObject, Closeable {
    * TODO - to be changed Object type
    */
   void setSearchCondition(Object expr);
+
+  /**
+   * It returns if the file is splittable.
+   *
+   * @return true if this scanner can split the a file.
+   */
+  boolean isSplittable();
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 657ba0f..e7479fd 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -20,8 +20,11 @@ package org.apache.tajo.storage;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -29,14 +32,13 @@ import org.apache.tajo.catalog.TableMetaImpl;
 import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.util.Bytes;
 import org.apache.tajo.util.FileUtil;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -49,6 +51,7 @@ public class StorageManager {
   private final FileSystem fs;
   private final Path baseDir;
   private final Path tableBaseDir;
+  private final boolean blocksMetadataEnabled;
 
   /**
    * Cache of scanner handlers for each storage type.
@@ -75,6 +78,10 @@ public class StorageManager {
     this.baseDir = new Path(conf.getVar(ConfVars.ROOT_DIR));
     this.tableBaseDir = new Path(this.baseDir, TajoConstants.WAREHOUSE_DIR);
     this.fs = baseDir.getFileSystem(conf);
+    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    if (!this.blocksMetadataEnabled)
+      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
   }
 
   public static StorageManager get(TajoConf conf) throws IOException {
@@ -118,7 +125,7 @@ public class StorageManager {
       throws IOException {
     FileSystem fs = path.getFileSystem(conf);
     FileStatus status = fs.getFileStatus(path);
-    Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen(), null);
+    Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
     return getScanner(conf, meta, fragment);
   }
 
@@ -218,8 +225,7 @@ public class StorageManager {
 
     FileStatus[] fileLists = fs.listStatus(new Path(tablePath, "data"));
     for (FileStatus file : fileLists) {
-      tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0,
-          file.getLen(), null);
+      tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
       listTablets.add(tablet);
     }
 
@@ -253,17 +259,14 @@ public class StorageManager {
       long start = 0;
       if (remainFileSize > defaultBlockSize) {
         while (remainFileSize > defaultBlockSize) {
-          tablet = new Fragment(tableName, file.getPath(), meta, start,
-              defaultBlockSize, null);
+          tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
           listTablets.add(tablet);
           start += defaultBlockSize;
           remainFileSize -= defaultBlockSize;
         }
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, start,
-            remainFileSize, null));
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
       } else {
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0,
-            remainFileSize, null));
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
       }
     }
 
@@ -288,17 +291,14 @@ public class StorageManager {
       long start = 0;
       if (remainFileSize > defaultBlockSize) {
         while (remainFileSize > defaultBlockSize) {
-          tablet = new Fragment(tableName, file.getPath(), meta, start,
-              defaultBlockSize, null);
+          tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
           listTablets.add(tablet);
           start += defaultBlockSize;
           remainFileSize -= defaultBlockSize;
         }
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, start,
-            remainFileSize, null));
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
       } else {
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0,
-            remainFileSize, null));
+        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
       }
     }
 
@@ -442,19 +442,23 @@ public class StorageManager {
    * so that Mappers process entire files.
    *
    * @param filename the file name to check
-   * @return is this file splitable?
+   * @return is this file isSplittable?
    */
-  protected boolean isSplitable(Path filename) {
-    return true;
+  protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
+    Scanner scanner = getScanner(conf, meta, filename);
+    return scanner.isSplittable();
   }
 
+  @Deprecated
   protected long computeSplitSize(long blockSize, long minSize,
                                   long maxSize) {
     return Math.max(minSize, Math.min(maxSize, blockSize));
   }
 
+  @Deprecated
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
+  @Deprecated
   protected int getBlockIndex(BlockLocation[] blkLocations,
                               long offset) {
     for (int i = 0; i < blkLocations.length; i++) {
@@ -475,9 +479,48 @@ public class StorageManager {
    * A factory that makes the split for this class. It can be overridden
    * by sub-classes to make sub-types
    */
-  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
-                               String[] hosts) {
-    return new Fragment(fragmentId, file, meta, start, length, hosts);
+  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+    return new Fragment(fragmentId, file, meta, start, length);
+  }
+
+  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+                               int[] diskIds) throws IOException {
+    return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
+  }
+
+  // for Non Splittable. eg, compressed gzip TextFile
+  protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+                                  BlockLocation[] blkLocations) throws IOException {
+
+    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+    for (BlockLocation blockLocation : blkLocations) {
+      for (String host : blockLocation.getHosts()) {
+        if (hostsBlockMap.containsKey(host)) {
+          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+        } else {
+          hostsBlockMap.put(host, 1);
+        }
+      }
+    }
+
+    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+      @Override
+      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+        return v1.getValue().compareTo(v2.getValue());
+      }
+    });
+
+    String[] hosts = new String[blkLocations[0].getHosts().length];
+    int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
+
+    for (int i = 0; i < hosts.length; i++) {
+      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+      hosts[i] = entry.getKey();
+      hostsBlockCount[i] = entry.getValue();
+    }
+    return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
   }
 
   /**
@@ -485,6 +528,7 @@ public class StorageManager {
    *
    * @return the maximum number of bytes a split can include
    */
+  @Deprecated
   public static long getMaxSplitSize() {
     // TODO - to be configurable
     return 536870912L;
@@ -495,56 +539,103 @@ public class StorageManager {
    *
    * @return the minimum number of bytes that can be in a split
    */
+  @Deprecated
   public static long getMinSplitSize() {
     // TODO - to be configurable
     return 67108864L;
   }
 
   /**
+   * Get Disk Ids by Volume Bytes
+   */
+  private int[] getDiskIds(VolumeId[] volumeIds) {
+    int[] diskIds = new int[volumeIds.length];
+    for (int i = 0; i < volumeIds.length; i++) {
+      int diskId = -1;
+      if (volumeIds[i] != null && volumeIds[i].isValid()) {
+        String volumeIdString = volumeIds[i].toString();
+        byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
+
+        if (volumeIdBytes.length == 4) {
+          diskId = Bytes.toInt(volumeIdBytes);
+        } else if (volumeIdBytes.length == 1) {
+          diskId = (int) volumeIdBytes[0];  // support hadoop-2.0.2
+        }
+      }
+      diskIds[i] = diskId;
+    }
+    return diskIds;
+  }
+
+  /**
+   * Generate the map of host and make them into Volume Ids.
+   *
+   */
+  private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+    for (Fragment frag : frags) {
+      String[] hosts = frag.getHosts();
+      int[] diskIds = frag.getDiskIds();
+      for (int i = 0; i < hosts.length; i++) {
+        Set<Integer> volumeList = volumeMap.get(hosts[i]);
+        if (volumeList == null) {
+          volumeList = new HashSet<Integer>();
+          volumeMap.put(hosts[i], volumeList);
+        }
+
+        if (diskIds.length > 0 && diskIds[i] > -1) {
+          volumeList.add(diskIds[i]);
+        }
+      }
+    }
+
+    return volumeMap;
+  }
+  /**
    * Generate the list of files and make them into FileSplits.
    *
    * @throws IOException
    */
   public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
-    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize());
-    long maxSize = getMaxSplitSize();
+    // generate splits'
 
-    // generate splits
     List<Fragment> splits = new ArrayList<Fragment>();
     List<FileStatus> files = listStatus(inputPath);
+    FileSystem fs = inputPath.getFileSystem(conf);
     for (FileStatus file : files) {
       Path path = file.getPath();
       long length = file.getLen();
-      if (length != 0) {
-        FileSystem fs = path.getFileSystem(conf);
+      if (length > 0) {
         BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-        if (isSplitable(path)) {
-          long blockSize = file.getBlockSize();
-          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-
-          long bytesRemaining = length;
-          while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-            int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-            splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, splitSize,
-                blkLocations[blkIndex].getHosts()));
-            bytesRemaining -= splitSize;
+        boolean splittable = isSplittable(meta, path);
+        if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+          // supported disk volume
+          BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
+              .getFileBlockStorageLocations(Arrays.asList(blkLocations));
+          if (splittable) {
+            for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+              splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+                  .getVolumeIds())));
+            }
+          } else { // Non splittable
+            splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
           }
 
-          if (bytesRemaining != 0) {
-            int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-            splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, bytesRemaining,
-                blkLocations[blkIndex].getHosts()));
+        } else {
+          if (splittable) {
+            for (BlockLocation blockLocation : blkLocations) {
+              splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+            }
+          } else { // Non splittable
+            splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
           }
-        } else { // not splitable
-          splits.add(makeSplit(tableName, meta, path, 0, length, blkLocations[0].getHosts()));
         }
       } else {
-        //Create empty hosts array for zero length files
-        splits.add(makeSplit(tableName, meta, path, 0, length, new String[0]));
+        //for zero length files
+        splits.add(makeSplit(tableName, meta, path, 0, length));
       }
     }
-    // Save the number of input files for metrics/loadgen
-    //job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
+
     LOG.debug("Total # of splits: " + splits.size());
     return splits;
   }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
index b74fcdf..a20d45f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
@@ -47,7 +47,7 @@ public class FragmentDeserializer implements JsonDeserializer<Fragment> {
 				gson.fromJson(fragObj.get("path"), Path.class), 
 				meta, 
 				fragObj.get("startOffset").getAsLong(), 
-				fragObj.get("length").getAsLong(), null);
+				fragObj.get("length").getAsLong());
 		return fragment;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
index 402c354..92d9acc 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
@@ -333,5 +333,10 @@ public class RCFileWrapper {
     public boolean isSelectable() {
       return false;
     }
+
+    @Override
+    public boolean isSplittable(){
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
index cf021db..ef43671 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
@@ -171,4 +171,9 @@ public class TrevniScanner extends FileScanner {
   public boolean isSelectable() {
     return false;
   }
+
+  @Override
+  public boolean isSplittable(){
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index f93e392..7c40d3d 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -137,10 +137,8 @@ public class TestMergeScanner {
     FileStatus status1 = fs.getFileStatus(table1Path);
     FileStatus status2 = fs.getFileStatus(table2Path);
     Fragment[] tablets = new Fragment[2];
-    tablets[0] = new Fragment("tablet1", table1Path, meta, 0,
-        status1.getLen(), null);
-    tablets[1] = new Fragment("tablet1", table2Path, meta, 0,
-        status2.getLen(), null);
+    tablets[0] = new Fragment("tablet1", table1Path, meta, 0, status1.getLen());
+    tablets[1] = new Fragment("tablet1", table2Path, meta, 0, status2.getLen());
     
     Scanner scanner = new MergeScanner(conf, meta, TUtil.newList(tablets));
     scanner.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index b8c7021..0bc2a80 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -111,10 +111,8 @@ public class TestStorages {
       long randomNum = (long) (Math.random() * fileLen) + 1;
 
       Fragment[] tablets = new Fragment[2];
-      tablets[0] = new Fragment("Splitable", tablePath, meta,
-          0, randomNum, null);
-      tablets[1] = new Fragment("Splitable", tablePath, meta,
-          randomNum, (fileLen - randomNum), null);
+      tablets[0] = new Fragment("Splitable", tablePath, meta, 0, randomNum);
+      tablets[1] = new Fragment("Splitable", tablePath, meta, randomNum, (fileLen - randomNum));
 
       Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
       scanner.init();
@@ -160,7 +158,7 @@ public class TestStorages {
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
-    Fragment fragment = new Fragment("testReadAndWrite", tablePath, meta, 0, status.getLen(), null);
+    Fragment fragment = new Fragment("testReadAndWrite", tablePath, meta, 0, status.getLen());
 
     Schema target = new Schema();
     target.addColumn("age", Type.INT8);
@@ -224,7 +222,7 @@ public class TestStorages {
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
-    Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen(), null);
+    Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen());
     Scanner scanner =  StorageManager.getScanner(conf, meta, fragment);
     scanner.init();
     Tuple retrieved = scanner.next();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 63f2a2d..436c50e 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
     
     SortSpec [] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -199,7 +199,7 @@ public class TestBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
 
     tuple = new VTuple(keySchema.getColumnNum());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"),
@@ -247,7 +247,7 @@ public class TestBSTIndex {
     appender.close();
 
     FileStatus status = fs.getFileStatus(tablePath);
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, status.getLen(), null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, status.getLen());
     
     SortSpec [] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -315,7 +315,7 @@ public class TestBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
     
     SortSpec [] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
@@ -401,7 +401,7 @@ public class TestBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
     
     SortSpec [] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 2708403..bfaf24e 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -93,7 +93,7 @@ public class TestSingleCSVFileBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
 
     SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -182,7 +182,7 @@ public class TestSingleCSVFileBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
     
     SortSpec [] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
index f64fa59..6dff443 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -18,9 +18,7 @@
 
 package org.apache.tajo.util;
 
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
+import java.net.*;
 
 public class NetUtils {
   public static String normalizeInetSocketAddress(InetSocketAddress addr) {
@@ -67,4 +65,38 @@ public class NetUtils {
     }
     return addr;
   }
+
+  /**
+   * Given an InetAddress, checks to see if the address is a local address, by
+   * comparing the address with all the interfaces on the node.
+   * @param addr address to check if it is local node's address
+   * @return true if the address corresponds to the local node
+   */
+  public static boolean isLocalAddress(InetAddress addr) {
+    // Check if the address is any local or loop back
+    boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+    // Check if the address is defined on any interface
+    if (!local) {
+      try {
+        local = NetworkInterface.getByInetAddress(addr) != null;
+      } catch (SocketException e) {
+        local = false;
+      }
+    }
+    return local;
+  }
+
+  public static String normalizeHost(String host) {
+    try {
+      InetAddress address = InetAddress.getByName(host);
+      if (isLocalAddress(address)) {
+        return InetAddress.getLocalHost().getHostAddress();
+      } else {
+        return address.getHostAddress();
+      }
+    } catch (UnknownHostException e) {
+    }
+    return host;
+  }
 }