You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/08/26 06:46:20 UTC
git commit: TAJO-84: Task scheduling with considering disk load
balance. (jinho)
Updated Branches:
refs/heads/master df09fd22c -> 8b8b6683d
TAJO-84: Task scheduling with considering disk load balance. (jinho)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/8b8b6683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/8b8b6683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/8b8b6683
Branch: refs/heads/master
Commit: 8b8b6683d86ca7d3362ad2c16644f6656d4e129d
Parents: df09fd2
Author: jinossy <ji...@gmail.com>
Authored: Mon Aug 26 13:45:26 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Aug 26 13:45:26 2013 +0900
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../tajo/engine/json/FragmentDeserializer.java | 2 +-
.../engine/planner/physical/SeqScanExec.java | 2 +-
.../apache/tajo/engine/query/ResultSetImpl.java | 6 +-
.../apache/tajo/master/TaskSchedulerImpl.java | 131 +++++++++++--
.../tajo/master/event/TaskScheduleEvent.java | 16 +-
.../tajo/master/querymaster/QueryUnit.java | 56 +++++-
.../master/querymaster/QueryUnitAttempt.java | 4 +-
.../tajo/master/querymaster/Repartitioner.java | 9 +-
.../main/java/org/apache/tajo/worker/Task.java | 2 +-
.../planner/physical/TestPhysicalPlanner.java | 4 +-
.../org/apache/tajo/storage/TestFragment.java | 13 +-
.../org/apache/tajo/storage/TestRowFile.java | 4 +-
.../java/org/apache/tajo/storage/CSVFile.java | 5 +
.../java/org/apache/tajo/storage/Fragment.java | 78 ++++++--
.../org/apache/tajo/storage/MergeScanner.java | 5 +
.../java/org/apache/tajo/storage/RawFile.java | 5 +
.../java/org/apache/tajo/storage/RowFile.java | 5 +
.../java/org/apache/tajo/storage/Scanner.java | 7 +
.../org/apache/tajo/storage/StorageManager.java | 189 ++++++++++++++-----
.../tajo/storage/json/FragmentDeserializer.java | 2 +-
.../tajo/storage/rcfile/RCFileWrapper.java | 5 +
.../tajo/storage/trevni/TrevniScanner.java | 5 +
.../apache/tajo/storage/TestMergeScanner.java | 6 +-
.../org/apache/tajo/storage/TestStorages.java | 10 +-
.../apache/tajo/storage/index/TestBSTIndex.java | 10 +-
.../index/TestSingleCSVFileBSTIndex.java | 4 +-
.../java/org/apache/tajo/util/NetUtils.java | 38 +++-
28 files changed, 478 insertions(+), 147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f41d352..de7d746 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 0.2.0 - unreleased
IMPROVEMENTS
+ TAJO-84: Task scheduling with considering disk load balance. (jinho)
+
TAJO-123: Clean up the logical plan's json format. (hyunsik)
TAJO-129: Enable the constructor of NettyServerBase to take a service
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
index 6231c03..63dd1fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/json/FragmentDeserializer.java
@@ -47,7 +47,7 @@ public class FragmentDeserializer implements JsonDeserializer<Fragment> {
gson.fromJson(fragObj.get("path"), Path.class),
meta,
fragObj.get("startOffset").getAsLong(),
- fragObj.get("length").getAsLong(), null);
+ fragObj.get("length").getAsLong());
return fragment;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index dc42a71..474415e 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -101,7 +101,7 @@ public class SeqScanExec extends PhysicalExec {
@Override
public Tuple next() throws IOException {
Tuple tuple;
- Tuple outTuple = new VTuple(outSchema.getColumnNum());
+ Tuple outTuple = new VTuple(outColumnNum);
if (!plan.hasQual()) {
if ((tuple = scanner.next()) != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index e0db8e1..808b910 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -133,8 +133,7 @@ public class ResultSetImpl implements ResultSet {
if (files[i].getLen() == 0) {
continue;
}
- fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l,
- files[i].getLen(), null));
+ fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l, files[i].getLen()));
}
return fraglist;
}
@@ -150,8 +149,7 @@ public class ResultSetImpl implements ResultSet {
if (files[i].getLen() == 0) {
continue;
}
- fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l,
- files[i].getLen(), null));
+ fraglist.add(new Fragment(tbname + "_" + i, files[i].getPath(), meta, 0l, files[i].getLen()));
}
return fraglist.toArray(new Fragment[fraglist.size()]);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index fc776e3..c9702b4 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -34,15 +34,16 @@ import org.apache.tajo.engine.planner.logical.ScanNode;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.QueryMasterProtocol;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
-import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
import org.apache.tajo.master.event.TaskAttemptAssignedEvent;
import org.apache.tajo.master.event.TaskRequestEvent;
import org.apache.tajo.master.event.TaskRequestEvent.TaskRequestEventType;
import org.apache.tajo.master.event.TaskScheduleEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent;
import org.apache.tajo.master.event.TaskSchedulerEvent.EventType;
+import org.apache.tajo.master.querymaster.QueryMaster.QueryContext;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TajoIdUtils;
import java.net.URI;
@@ -248,28 +249,111 @@ public class TaskSchedulerImpl extends AbstractService
}
}
+ public static class TaskBlockLocation {
+ private HashMap<Integer, LinkedList<QueryUnitAttemptId>> unAssignedTaskMap =
+ new HashMap<Integer, LinkedList<QueryUnitAttemptId>>();
+ private HashMap<ContainerId, Integer> assignedContainerMap = new HashMap<ContainerId, Integer>();
+ private TreeMap<Integer, Integer> volumeUsageMap = new TreeMap<Integer, Integer>();
+ private String host;
+
+ public TaskBlockLocation(String host){
+ this.host = host;
+ }
+
+ public void addQueryUnitAttemptId(Integer volumeId, QueryUnitAttemptId attemptId){
+ LinkedList<QueryUnitAttemptId> list = unAssignedTaskMap.get(volumeId);
+ if (list == null) {
+ list = new LinkedList<QueryUnitAttemptId>();
+ unAssignedTaskMap.put(volumeId, list);
+ }
+ list.add(attemptId);
+
+ if(!volumeUsageMap.containsKey(volumeId)) volumeUsageMap.put(volumeId, 0);
+ }
+
+ public LinkedList<QueryUnitAttemptId> getQueryUnitAttemptIdList(ContainerId containerId){
+ Integer volumeId;
+
+ if (!assignedContainerMap.containsKey(containerId)) {
+ volumeId = assignVolumeId();
+ assignedContainerMap.put(containerId, volumeId);
+ } else {
+ volumeId = assignedContainerMap.get(containerId);
+ }
+
+ LinkedList<QueryUnitAttemptId> list = null;
+ if (unAssignedTaskMap.size() > 0) {
+ int retry = unAssignedTaskMap.size();
+ do {
+ list = unAssignedTaskMap.get(volumeId);
+ if (list == null || list.size() == 0) {
+ //clean and reassign remaining volume
+ unAssignedTaskMap.remove(volumeId);
+ volumeUsageMap.remove(volumeId);
+ if (volumeId < 0) break; // processed all block on disk
+
+ volumeId = assignVolumeId();
+ assignedContainerMap.put(containerId, volumeId);
+ retry--;
+ } else {
+ break;
+ }
+ } while (retry > 0);
+ }
+ return list;
+ }
+
+ public Integer assignVolumeId(){
+ Map.Entry<Integer, Integer> volumeEntry = null;
+
+ for (Map.Entry<Integer, Integer> entry : volumeUsageMap.entrySet()) {
+ if(volumeEntry == null) volumeEntry = entry;
+
+ if (volumeEntry.getValue() >= entry.getValue()) {
+ volumeEntry = entry;
+ }
+ }
+
+ if(volumeEntry != null){
+ volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
+ LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", concurrency : "
+ + volumeUsageMap.get(volumeEntry.getKey()));
+ return volumeEntry.getKey();
+ } else {
+ return -1; // processed all block on disk
+ }
+ }
+
+ public String getHost() {
+ return host;
+ }
+ }
+
private class ScheduledRequests {
private final HashSet<QueryUnitAttemptId> leafTasks = new HashSet<QueryUnitAttemptId>();
private final HashSet<QueryUnitAttemptId> nonLeafTasks = new HashSet<QueryUnitAttemptId>();
- private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksHostMapping =
- new HashMap<String, LinkedList<QueryUnitAttemptId>>();
+ private Map<String, TaskBlockLocation> leafTaskHostMapping = new HashMap<String, TaskBlockLocation>();
private final Map<String, LinkedList<QueryUnitAttemptId>> leafTasksRackMapping =
new HashMap<String, LinkedList<QueryUnitAttemptId>>();
public void addLeafTask(TaskScheduleEvent event) {
- for (String host : event.getHosts()) {
- String hostName = NetUtils.normalizeHostName(host);
- LinkedList<QueryUnitAttemptId> list = leafTasksHostMapping.get(hostName);
- if (list == null) {
- list = new LinkedList<QueryUnitAttemptId>();
- leafTasksHostMapping.put(hostName, list);
+ List<QueryUnit.DataLocation> locations = event.getDataLocations();
+
+ for (QueryUnit.DataLocation location : locations) {
+ String host = location.getHost();
+
+ TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
+ if (taskBlockLocation == null) {
+ taskBlockLocation = new TaskBlockLocation(host);
+ leafTaskHostMapping.put(host, taskBlockLocation);
}
- list.add(event.getAttemptId());
+ taskBlockLocation.addQueryUnitAttemptId(location.getVolumeId(), event.getAttemptId());
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Added attempt req to host " + hostName);
+ LOG.debug("Added attempt req to host " + host);
}
}
- for (String rack: event.getRacks()) {
+ for (String rack : event.getRacks()) {
LinkedList<QueryUnitAttemptId> list = leafTasksRackMapping.get(rack);
if (list == null) {
list = new LinkedList<QueryUnitAttemptId>();
@@ -297,7 +381,6 @@ public class TaskSchedulerImpl extends AbstractService
}
public Set<QueryUnitAttemptId> AssignedRequest = new HashSet<QueryUnitAttemptId>();
-
public void assignToLeafTasks(List<TaskRequestEvent> taskRequests) {
Iterator<TaskRequestEvent> it = taskRequests.iterator();
LOG.info("Got task requests " + taskRequests.size());
@@ -306,14 +389,22 @@ public class TaskSchedulerImpl extends AbstractService
while (it.hasNext() && leafTasks.size() > 0) {
taskRequest = it.next();
ContainerProxy container = context.getContainer(taskRequest.getContainerId());
- String hostName = NetUtils.normalizeHostName(container.getTaskHostName());
+ String host = container.getTaskHostName();
QueryUnitAttemptId attemptId = null;
+ LinkedList<QueryUnitAttemptId> list = null;
- // local allocation
- LinkedList<QueryUnitAttemptId> list = leafTasksHostMapping.get(hostName);
- while(list != null && list.size() > 0) {
+ // local disk allocation
+ if(!leafTaskHostMapping.containsKey(host)){
+ host = NetUtils.normalizeHost(host);
+ }
+
+ TaskBlockLocation taskBlockLocation = leafTaskHostMapping.get(host);
+ if (taskBlockLocation != null) {
+ list = taskBlockLocation.getQueryUnitAttemptIdList(taskRequest.getContainerId());
+ }
+ while (list != null && list.size() > 0) {
QueryUnitAttemptId tId = list.removeFirst();
if (leafTasks.contains(tId)) {
@@ -327,7 +418,7 @@ public class TaskSchedulerImpl extends AbstractService
// rack allocation
if (attemptId == null) {
- String rack = RackResolver.resolve(hostName).getNetworkLocation();
+ String rack = RackResolver.resolve(host).getNetworkLocation();
list = leafTasksRackMapping.get(rack);
while(list != null && list.size() > 0) {
@@ -365,7 +456,7 @@ public class TaskSchedulerImpl extends AbstractService
context.getEventHandler().handle(new TaskAttemptAssignedEvent(attemptId,
taskRequest.getContainerId(),
- container.getTaskHostName(), container.getTaskPort()));
+ host, container.getTaskPort()));
AssignedRequest.add(attemptId);
totalAssigned++;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
index 03b2aba..1f87356 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/event/TaskScheduleEvent.java
@@ -19,23 +19,25 @@
package org.apache.tajo.master.event;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.master.querymaster.QueryUnit;
import java.util.Arrays;
+import java.util.List;
public class TaskScheduleEvent extends TaskSchedulerEvent {
private final QueryUnitAttemptId attemptId;
private final boolean isLeafQuery;
- private final String[] hosts;
+ private final List<QueryUnit.DataLocation> dataLocations;
private final String[] racks;
public TaskScheduleEvent(final QueryUnitAttemptId attemptId,
final EventType eventType, boolean isLeafQuery,
- final String[] hosts,
+ final List<QueryUnit.DataLocation> dataLocations,
final String[] racks) {
super(eventType, attemptId.getSubQueryId());
this.attemptId = attemptId;
this.isLeafQuery = isLeafQuery;
- this.hosts = hosts;
+ this.dataLocations = dataLocations;
this.racks = racks;
}
@@ -47,11 +49,11 @@ public class TaskScheduleEvent extends TaskSchedulerEvent {
return this.isLeafQuery;
}
- public String [] getHosts() {
- return this.hosts;
+ public List<QueryUnit.DataLocation> getDataLocations() {
+ return this.dataLocations;
}
- public String [] getRacks() {
+ public String[] getRacks() {
return this.racks;
}
@@ -60,7 +62,7 @@ public class TaskScheduleEvent extends TaskSchedulerEvent {
return "TaskScheduleEvent{" +
"attemptId=" + attemptId +
", isLeafQuery=" + isLeafQuery +
- ", hosts=" + (hosts == null ? null : Arrays.asList(hosts)) +
+ ", hosts=" + (dataLocations == null ? null : dataLocations) +
", racks=" + (racks == null ? null : Arrays.asList(racks)) +
'}';
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index cf32e16..69143c9 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -59,7 +59,7 @@ public class QueryUnit implements EventHandler<TaskEvent> {
private List<Partition> partitions;
private TableStat stats;
- private String [] dataLocations;
+ private List<DataLocation> dataLocations;
private final boolean isLeafTask;
private List<IntermediateEntry> intermediateData;
@@ -127,11 +127,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return this.isLeafTask;
}
- public void setDataLocations(String [] dataLocations) {
- this.dataLocations = dataLocations;
+ public void setDataLocations(Fragment fragment) {
+ String[] hosts = fragment.getHosts();
+ int[] blockCount = fragment.getHostsBlockCount();
+ int[] volumeIds = fragment.getDiskIds();
+ this.dataLocations = new ArrayList<DataLocation>(hosts.length);
+
+ for (int i = 0; i < hosts.length; i++) {
+ this.dataLocations.add(new DataLocation(hosts[i], blockCount[i], volumeIds[i]));
+ }
}
- public String [] getDataLocations() {
+ public List<DataLocation> getDataLocations() {
return this.dataLocations;
}
@@ -171,16 +178,12 @@ public class QueryUnit implements EventHandler<TaskEvent> {
@Deprecated
public void setFragment(String tableId, Fragment fragment) {
this.fragMap.put(tableId, fragment);
- if (fragment.hasDataLocations()) {
- setDataLocations(fragment.getDataLocations());
- }
+ setDataLocations(fragment);
}
public void setFragment2(Fragment fragment) {
this.fragMap.put(fragment.getName(), fragment);
- if (fragment.hasDataLocations()) {
- setDataLocations(fragment.getDataLocations());
- }
+ setDataLocations(fragment);
}
public void addFetch(String tableId, String uri) throws URISyntaxException {
@@ -496,4 +499,37 @@ public class QueryUnit implements EventHandler<TaskEvent> {
return pullHost + ":" + port;
}
}
+
+ public static class DataLocation {
+ private String host;
+ private int blockCount; // for Non-Splittable
+ private int volumeId;
+
+ public DataLocation(String host, int blockCount, int volumeId) {
+ this.host = host;
+ this.blockCount = blockCount;
+ this.volumeId = volumeId;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getBlockCount() {
+ return blockCount;
+ }
+
+ public int getVolumeId() {
+ return volumeId;
+ }
+
+ @Override
+ public String toString() {
+ return "DataLocation{" +
+ "host=" + host +
+ ", blocks=" + blockCount +
+ ", volumeId=" + volumeId +
+ '}';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
index 5443858..add42aa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryUnitAttempt.java
@@ -201,8 +201,8 @@ public class QueryUnitAttempt implements EventHandler<TaskAttemptEvent> {
if (taskAttempt.isLeafTask()
&& taskAttempt.getQueryUnit().getScanNodes().length == 1) {
Set<String> racks = new HashSet<String>();
- for (String host : taskAttempt.getQueryUnit().getDataLocations()) {
- racks.add(RackResolver.resolve(host).getNetworkLocation());
+ for (QueryUnit.DataLocation location : taskAttempt.getQueryUnit().getDataLocations()) {
+ racks.add(RackResolver.resolve(location.getHost()).getNetworkLocation());
}
taskAttempt.eventHandler.handle(new TaskScheduleEvent(
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index ff590f3..cccfcba 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -78,8 +78,7 @@ public class Repartitioner {
if (scans[i].isLocal()) { // it only requires a dummy fragment.
fragments[i] = new Fragment(scans[i].getTableId(), tablePath,
- CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV),
- 0, 0, null);
+ CatalogUtil.newTableMeta(scans[i].getInSchema(), StoreType.CSV), 0, 0);
} else {
fragments[i] = subQuery.getStorageManager().getSplits(scans[i].getTableId(),
tableDesc.getMeta(),
@@ -293,8 +292,7 @@ public class Repartitioner {
TupleRange [] ranges = partitioner.partition(determinedTaskNum);
Fragment dummyFragment = new Fragment(scan.getTableId(), tablePath,
- CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
- 0, 0, null);
+ CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),0, 0);
List<String> basicFetchURIs = new ArrayList<String>();
@@ -391,8 +389,7 @@ public class Repartitioner {
}
Fragment frag = new Fragment(scan.getTableId(), tablePath,
- CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV),
- 0, 0, null);
+ CatalogUtil.newTableMeta(scan.getInSchema(), StoreType.CSV), 0, 0);
Map<Integer, List<IntermediateEntry>> hashed = hashByKey(partitions);
Map<String, List<IntermediateEntry>> hashedByHost;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index 559f29b..94f1662 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -460,7 +460,7 @@ public class Task {
if (f.getLen() == 0) {
continue;
}
- tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen(), null);
+ tablet = new Fragment(name, f.getPath(), meta, 0l, f.getLen());
listTablets.add(tablet);
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 271148f..dbb3862 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -469,7 +469,7 @@ public class TestPhysicalPlanner {
Fragment [] fragments = new Fragment[list.length];
int i = 0;
for (FileStatus status : list) {
- fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen(), null);
+ fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen());
}
Scanner scanner = new MergeScanner(conf, outputMeta,TUtil.newList(fragments));
scanner.init();
@@ -527,7 +527,7 @@ public class TestPhysicalPlanner {
Fragment [] fragments = new Fragment[list.length];
int i = 0;
for (FileStatus status : list) {
- fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen(), null);
+ fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen());
}
Scanner scanner = new MergeScanner(conf, outputMeta,TUtil.newList(fragments));
scanner.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
index 1382db3..9245b56 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java
@@ -48,8 +48,7 @@ public class TestFragment {
@Test
public final void testGetAndSetFields() {
- Fragment fragment1 = new Fragment("table1_1", new Path("/table0"),
- meta1, 0, 500, null);
+ Fragment fragment1 = new Fragment("table1_1", new Path("/table0"), meta1, 0, 500);
fragment1.setDistCached();
assertEquals("table1_1", fragment1.getName());
@@ -70,7 +69,7 @@ public class TestFragment {
@Test
public final void testTabletTabletProto() {
- Fragment fragment0 = new Fragment("table1_1", new Path("/table0"), meta1, 0, 500, null);
+ Fragment fragment0 = new Fragment("table1_1", new Path("/table0"), meta1, 0, 500);
Fragment fragment1 = new Fragment(fragment0.getProto());
assertEquals("table1_1", fragment1.getName());
@@ -93,9 +92,7 @@ public class TestFragment {
final int num = 10;
Fragment [] tablets = new Fragment[num];
for (int i = num - 1; i >= 0; i--) {
- tablets[i]
- = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, i * 500,
- (i+1) * 500, null);
+ tablets[i] = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, i * 500, (i+1) * 500);
}
Arrays.sort(tablets);
@@ -110,9 +107,7 @@ public class TestFragment {
final int num = 1860;
Fragment [] tablets = new Fragment[num];
for (int i = num - 1; i >= 0; i--) {
- tablets[i]
- = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, (long)i * 6553500,
- (long)(i+1) * 6553500, null);
+ tablets[i] = new Fragment("tablet1_"+i, new Path("tablet0"), meta1, (long)i * 6553500, (long)(i+1) * 6553500);
}
SortedSet sortedSet = Sets.newTreeSet();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
index 2d67690..0b85fdf 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java
@@ -113,7 +113,7 @@ public class TestRowFile {
TableProto proto = (TableProto) FileUtil.loadProto(
util.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance());
meta = new TableMetaImpl(proto);
- Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen(), null);
+ Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen());
int tupleCnt = 0;
start = System.currentTimeMillis();
@@ -136,7 +136,7 @@ public class TestRowFile {
for (int i = 0; i < 13; i++) {
System.out.println("range: " + fileStart + ", " + fileLen);
- fragment = new Fragment("test.tbl", dataPath, meta, fileStart, fileLen, null);
+ fragment = new Fragment("test.tbl", dataPath, meta, fileStart, fileLen);
scanner = new RowFile.RowFileScanner(conf, meta, fragment);
scanner.init();
while ((tuple=scanner.next()) != null) {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index f0d9335..0f6ed41 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -460,5 +460,10 @@ public class CSVFile {
}
return this.tupleOffsets[currentIdx];
}
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
index e070726..4f6dde1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java
@@ -21,6 +21,7 @@ package org.apache.tajo.storage;
import com.google.common.base.Objects;
import com.google.gson.Gson;
import com.google.gson.annotations.Expose;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.json.GsonObject;
import org.apache.tajo.catalog.*;
@@ -29,6 +30,9 @@ import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
import org.apache.tajo.storage.json.StorageGsonHelper;
import org.apache.tajo.util.TUtil;
+import java.io.IOException;
+import java.util.Arrays;
+
public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject, GsonObject {
protected FragmentProto.Builder builder = null;
@@ -39,21 +43,44 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
@Expose private Long length; // required
@Expose private boolean distCached = false; // optional
- private String [] dataLocations;
+ private String[] hosts; // Datanode hostnames
+ private int[] hostsBlockCount; // list of block count of hosts
+ private int[] diskIds;
public Fragment() {
builder = FragmentProto.newBuilder();
}
- public Fragment(String tableName, Path uri, TableMeta meta, long start,
- long length, String [] dataLocations) {
+ public Fragment(String tableName, Path uri, TableMeta meta, BlockLocation blockLocation, int[] diskIds) throws IOException {
+ this();
+ TableMeta newMeta = new TableMetaImpl(meta.getProto());
+ SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
+ .getSchema().getProto());
+ newMeta.setSchema(new Schema(newSchemaProto));
+ this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength());
+ this.hosts = blockLocation.getHosts();
+ this.diskIds = diskIds;
+ }
+
+ // Non splittable
+ public Fragment(String tableName, Path uri, TableMeta meta, long start, long length, String[] hosts, int[] hostsBlockCount) {
this();
TableMeta newMeta = new TableMetaImpl(meta.getProto());
SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta
.getSchema().getProto());
newMeta.setSchema(new Schema(newSchemaProto));
this.set(tableName, uri, newMeta, start, length);
- this.dataLocations = dataLocations;
+ this.hosts = hosts;
+ this.hostsBlockCount = hostsBlockCount;
+ }
+
+ public Fragment(String fragmentId, Path path, TableMeta meta, long start, long length) {
+ this();
+ TableMeta newMeta = new TableMetaImpl(meta.getProto());
+ SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(fragmentId, meta
+ .getSchema().getProto());
+ newMeta.setSchema(new Schema(newSchemaProto));
+ this.set(fragmentId, path, newMeta, start, length);
}
public Fragment(FragmentProto proto) {
@@ -66,14 +93,6 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
}
}
- public boolean hasDataLocations() {
- return this.dataLocations != null;
- }
-
- public String [] getDataLocations() {
- return this.dataLocations;
- }
-
private void set(String tableName, Path path, TableMeta meta, long start,
long length) {
this.tableName = tableName;
@@ -83,6 +102,41 @@ public class Fragment implements TableDesc, Comparable<Fragment>, SchemaObject,
this.length = length;
}
+
+ /**
+ * Get the list of hosts (hostname) hosting this block
+ */
+ public String[] getHosts() {
+ if (hosts == null) {
+ this.hosts = new String[0];
+ }
+ return hosts;
+ }
+
+ /**
+ * Get the list of hosts block count
+ * if a fragment given multiple block, it returned 'host0:3, host1:1 ...'
+ */
+ public int[] getHostsBlockCount() {
+ if (hostsBlockCount == null) {
+ this.hostsBlockCount = new int[getHosts().length];
+ Arrays.fill(this.hostsBlockCount, 1);
+ }
+ return hostsBlockCount;
+ }
+
+ /**
+ * Get the list of Disk Ids
+ * Unknown disk is -1. Others 0 ~ N
+ */
+ public int[] getDiskIds() {
+ if (diskIds == null) {
+ this.diskIds = new int[getHosts().length];
+ Arrays.fill(this.diskIds, -1);
+ }
+ return diskIds;
+ }
+
public String getName() {
return this.tableName;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 1afd9d5..8b7ec67 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -107,4 +107,9 @@ public class MergeScanner implements Scanner {
public Schema getSchema() {
return meta.getSchema();
}
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index b3f158b..c8127b1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -245,6 +245,11 @@ public class RawFile {
public boolean isSelectable() {
return false;
}
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
}
public static class RawFileAppender extends FileAppender {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
index cd6dc00..6a3bcf1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -311,6 +311,11 @@ public class RowFile {
public boolean isSelectable() {
return false;
}
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
}
public static class RowFileAppender extends FileAppender {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
index 4f5e7b1..6dca3f2 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Scanner.java
@@ -84,4 +84,11 @@ public interface Scanner extends SchemaObject, Closeable {
* TODO - to be changed Object type
*/
void setSearchCondition(Object expr);
+
+ /**
+ * It returns if the file is splittable.
+ *
+ * @return true if this scanner can split the a file.
+ */
+ boolean isSplittable();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 657ba0f..e7479fd 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -20,8 +20,11 @@ package org.apache.tajo.storage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
@@ -29,14 +32,13 @@ import org.apache.tajo.catalog.TableMetaImpl;
import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.util.Bytes;
import org.apache.tajo.util.FileUtil;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -49,6 +51,7 @@ public class StorageManager {
private final FileSystem fs;
private final Path baseDir;
private final Path tableBaseDir;
+ private final boolean blocksMetadataEnabled;
/**
* Cache of scanner handlers for each storage type.
@@ -75,6 +78,10 @@ public class StorageManager {
this.baseDir = new Path(conf.getVar(ConfVars.ROOT_DIR));
this.tableBaseDir = new Path(this.baseDir, TajoConstants.WAREHOUSE_DIR);
this.fs = baseDir.getFileSystem(conf);
+ this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ if (!this.blocksMetadataEnabled)
+ LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
}
public static StorageManager get(TajoConf conf) throws IOException {
@@ -118,7 +125,7 @@ public class StorageManager {
throws IOException {
FileSystem fs = path.getFileSystem(conf);
FileStatus status = fs.getFileStatus(path);
- Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen(), null);
+ Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
return getScanner(conf, meta, fragment);
}
@@ -218,8 +225,7 @@ public class StorageManager {
FileStatus[] fileLists = fs.listStatus(new Path(tablePath, "data"));
for (FileStatus file : fileLists) {
- tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0,
- file.getLen(), null);
+ tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
listTablets.add(tablet);
}
@@ -253,17 +259,14 @@ public class StorageManager {
long start = 0;
if (remainFileSize > defaultBlockSize) {
while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), meta, start,
- defaultBlockSize, null);
+ tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
listTablets.add(tablet);
start += defaultBlockSize;
remainFileSize -= defaultBlockSize;
}
- listTablets.add(new Fragment(tableName, file.getPath(), meta, start,
- remainFileSize, null));
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
} else {
- listTablets.add(new Fragment(tableName, file.getPath(), meta, 0,
- remainFileSize, null));
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
}
}
@@ -288,17 +291,14 @@ public class StorageManager {
long start = 0;
if (remainFileSize > defaultBlockSize) {
while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), meta, start,
- defaultBlockSize, null);
+ tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
listTablets.add(tablet);
start += defaultBlockSize;
remainFileSize -= defaultBlockSize;
}
- listTablets.add(new Fragment(tableName, file.getPath(), meta, start,
- remainFileSize, null));
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
} else {
- listTablets.add(new Fragment(tableName, file.getPath(), meta, 0,
- remainFileSize, null));
+ listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
}
}
@@ -442,19 +442,23 @@ public class StorageManager {
* so that Mappers process entire files.
*
* @param filename the file name to check
- * @return is this file splitable?
+ * @return is this file isSplittable?
*/
- protected boolean isSplitable(Path filename) {
- return true;
+ protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
+ Scanner scanner = getScanner(conf, meta, filename);
+ return scanner.isSplittable();
}
+ @Deprecated
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
+ @Deprecated
private static final double SPLIT_SLOP = 1.1; // 10% slop
+ @Deprecated
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0; i < blkLocations.length; i++) {
@@ -475,9 +479,48 @@ public class StorageManager {
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
- String[] hosts) {
- return new Fragment(fragmentId, file, meta, start, length, hosts);
+ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
+ return new Fragment(fragmentId, file, meta, start, length);
+ }
+
+ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
+ int[] diskIds) throws IOException {
+ return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
+ }
+
+ // for Non Splittable. eg, compressed gzip TextFile
+ protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
+ BlockLocation[] blkLocations) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+ int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+ hosts[i] = entry.getKey();
+ hostsBlockCount[i] = entry.getValue();
+ }
+ return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
}
/**
@@ -485,6 +528,7 @@ public class StorageManager {
*
* @return the maximum number of bytes a split can include
*/
+ @Deprecated
public static long getMaxSplitSize() {
// TODO - to be configurable
return 536870912L;
@@ -495,56 +539,103 @@ public class StorageManager {
*
* @return the minimum number of bytes that can be in a split
*/
+ @Deprecated
public static long getMinSplitSize() {
// TODO - to be configurable
return 67108864L;
}
/**
+ * Get Disk Ids by Volume Bytes
+ */
+ private int[] getDiskIds(VolumeId[] volumeIds) {
+ int[] diskIds = new int[volumeIds.length];
+ for (int i = 0; i < volumeIds.length; i++) {
+ int diskId = -1;
+ if (volumeIds[i] != null && volumeIds[i].isValid()) {
+ String volumeIdString = volumeIds[i].toString();
+ byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
+
+ if (volumeIdBytes.length == 4) {
+ diskId = Bytes.toInt(volumeIdBytes);
+ } else if (volumeIdBytes.length == 1) {
+ diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
+ }
+ }
+ diskIds[i] = diskId;
+ }
+ return diskIds;
+ }
+
+ /**
+ * Generate the map of host and make them into Volume Ids.
+ *
+ */
+ private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
+ Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+ for (Fragment frag : frags) {
+ String[] hosts = frag.getHosts();
+ int[] diskIds = frag.getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ Set<Integer> volumeList = volumeMap.get(hosts[i]);
+ if (volumeList == null) {
+ volumeList = new HashSet<Integer>();
+ volumeMap.put(hosts[i], volumeList);
+ }
+
+ if (diskIds.length > 0 && diskIds[i] > -1) {
+ volumeList.add(diskIds[i]);
+ }
+ }
+ }
+
+ return volumeMap;
+ }
+ /**
* Generate the list of files and make them into FileSplits.
*
* @throws IOException
*/
public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize());
- long maxSize = getMaxSplitSize();
+ // generate splits'
- // generate splits
List<Fragment> splits = new ArrayList<Fragment>();
List<FileStatus> files = listStatus(inputPath);
+ FileSystem fs = inputPath.getFileSystem(conf);
for (FileStatus file : files) {
Path path = file.getPath();
long length = file.getLen();
- if (length != 0) {
- FileSystem fs = path.getFileSystem(conf);
+ if (length > 0) {
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- if (isSplitable(path)) {
- long blockSize = file.getBlockSize();
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-
- long bytesRemaining = length;
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
+ boolean splittable = isSplittable(meta, path);
+ if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+ // supported disk volume
+ BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
+ .getFileBlockStorageLocations(Arrays.asList(blkLocations));
+ if (splittable) {
+ for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
+ .getVolumeIds())));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
}
- if (bytesRemaining != 0) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, meta, path, length - bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts()));
+ } else {
+ if (splittable) {
+ for (BlockLocation blockLocation : blkLocations) {
+ splits.add(makeSplit(tableName, meta, path, blockLocation, null));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
}
- } else { // not splitable
- splits.add(makeSplit(tableName, meta, path, 0, length, blkLocations[0].getHosts()));
}
} else {
- //Create empty hosts array for zero length files
- splits.add(makeSplit(tableName, meta, path, 0, length, new String[0]));
+ //for zero length files
+ splits.add(makeSplit(tableName, meta, path, 0, length));
}
}
- // Save the number of input files for metrics/loadgen
- //job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
+
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
index b74fcdf..a20d45f 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/json/FragmentDeserializer.java
@@ -47,7 +47,7 @@ public class FragmentDeserializer implements JsonDeserializer<Fragment> {
gson.fromJson(fragObj.get("path"), Path.class),
meta,
fragObj.get("startOffset").getAsLong(),
- fragObj.get("length").getAsLong(), null);
+ fragObj.get("length").getAsLong());
return fragment;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
index 402c354..92d9acc 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFileWrapper.java
@@ -333,5 +333,10 @@ public class RCFileWrapper {
public boolean isSelectable() {
return false;
}
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
index cf021db..ef43671 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
@@ -171,4 +171,9 @@ public class TrevniScanner extends FileScanner {
public boolean isSelectable() {
return false;
}
+
+ @Override
+ public boolean isSplittable(){
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index f93e392..7c40d3d 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -137,10 +137,8 @@ public class TestMergeScanner {
FileStatus status1 = fs.getFileStatus(table1Path);
FileStatus status2 = fs.getFileStatus(table2Path);
Fragment[] tablets = new Fragment[2];
- tablets[0] = new Fragment("tablet1", table1Path, meta, 0,
- status1.getLen(), null);
- tablets[1] = new Fragment("tablet1", table2Path, meta, 0,
- status2.getLen(), null);
+ tablets[0] = new Fragment("tablet1", table1Path, meta, 0, status1.getLen());
+ tablets[1] = new Fragment("tablet1", table2Path, meta, 0, status2.getLen());
Scanner scanner = new MergeScanner(conf, meta, TUtil.newList(tablets));
scanner.init();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index b8c7021..0bc2a80 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -111,10 +111,8 @@ public class TestStorages {
long randomNum = (long) (Math.random() * fileLen) + 1;
Fragment[] tablets = new Fragment[2];
- tablets[0] = new Fragment("Splitable", tablePath, meta,
- 0, randomNum, null);
- tablets[1] = new Fragment("Splitable", tablePath, meta,
- randomNum, (fileLen - randomNum), null);
+ tablets[0] = new Fragment("Splitable", tablePath, meta, 0, randomNum);
+ tablets[1] = new Fragment("Splitable", tablePath, meta, randomNum, (fileLen - randomNum));
Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
scanner.init();
@@ -160,7 +158,7 @@ public class TestStorages {
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
- Fragment fragment = new Fragment("testReadAndWrite", tablePath, meta, 0, status.getLen(), null);
+ Fragment fragment = new Fragment("testReadAndWrite", tablePath, meta, 0, status.getLen());
Schema target = new Schema();
target.addColumn("age", Type.INT8);
@@ -224,7 +222,7 @@ public class TestStorages {
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
- Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen(), null);
+ Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen());
Scanner scanner = StorageManager.getScanner(conf, meta, fragment);
scanner.init();
Tuple retrieved = scanner.next();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 63f2a2d..436c50e 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
SortSpec [] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -199,7 +199,7 @@ public class TestBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
tuple = new VTuple(keySchema.getColumnNum());
BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"),
@@ -247,7 +247,7 @@ public class TestBSTIndex {
appender.close();
FileStatus status = fs.getFileStatus(tablePath);
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, status.getLen(), null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, status.getLen());
SortSpec [] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -315,7 +315,7 @@ public class TestBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
SortSpec [] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
@@ -401,7 +401,7 @@ public class TestBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
SortSpec [] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 2708403..bfaf24e 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -93,7 +93,7 @@ public class TestSingleCSVFileBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
SortSpec[] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -182,7 +182,7 @@ public class TestSingleCSVFileBSTIndex {
FileStatus status = fs.getFileStatus(tablePath);
long fileLen = status.getLen();
- Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
+ Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen);
SortSpec [] sortKeys = new SortSpec[2];
sortKeys[0] = new SortSpec(schema.getColumn("int"), true, false);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/8b8b6683/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
----------------------------------------------------------------------
diff --git a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
index f64fa59..6dff443 100644
--- a/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
+++ b/tajo-rpc/src/main/java/org/apache/tajo/util/NetUtils.java
@@ -18,9 +18,7 @@
package org.apache.tajo.util;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
+import java.net.*;
public class NetUtils {
public static String normalizeInetSocketAddress(InetSocketAddress addr) {
@@ -67,4 +65,38 @@ public class NetUtils {
}
return addr;
}
+
+ /**
+ * Given an InetAddress, checks to see if the address is a local address, by
+ * comparing the address with all the interfaces on the node.
+ * @param addr address to check if it is local node's address
+ * @return true if the address corresponds to the local node
+ */
+ public static boolean isLocalAddress(InetAddress addr) {
+ // Check if the address is any local or loop back
+ boolean local = addr.isAnyLocalAddress() || addr.isLoopbackAddress();
+
+ // Check if the address is defined on any interface
+ if (!local) {
+ try {
+ local = NetworkInterface.getByInetAddress(addr) != null;
+ } catch (SocketException e) {
+ local = false;
+ }
+ }
+ return local;
+ }
+
+ public static String normalizeHost(String host) {
+ try {
+ InetAddress address = InetAddress.getByName(host);
+ if (isLocalAddress(address)) {
+ return InetAddress.getLocalHost().getHostAddress();
+ } else {
+ return address.getHostAddress();
+ }
+ } catch (UnknownHostException e) {
+ }
+ return host;
+ }
}