You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/12/12 07:00:05 UTC
[31/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the
master branch. (Hyoungjun Kim via hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
index e4b98d4..406550d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultFragmentScheduleAlgorithm.java
@@ -21,6 +21,7 @@ package org.apache.tajo.master;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.util.RackResolver;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
import java.util.*;
@@ -81,9 +82,12 @@ public class DefaultFragmentScheduleAlgorithm implements FragmentScheduleAlgorit
@Override
public void addFragment(FragmentPair fragmentPair) {
String[] hosts = fragmentPair.getLeftFragment().getHosts();
- int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+ int[] diskIds = null;
+ if (fragmentPair.getLeftFragment() instanceof FileFragment) {
+ diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
+ }
for (int i = 0; i < hosts.length; i++) {
- addFragment(hosts[i], diskIds[i], fragmentPair);
+ addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
}
fragmentNum++;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
index 77e3257..01137aa 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java
@@ -44,6 +44,7 @@ import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.FetchImpl;
@@ -150,8 +151,8 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler {
super.stop();
}
- private FileFragment[] fragmentsForNonLeafTask;
- private FileFragment[] broadcastFragmentsForNonLeafTask;
+ private Fragment[] fragmentsForNonLeafTask;
+ private Fragment[] broadcastFragmentsForNonLeafTask;
LinkedList<TaskRequestEvent> taskRequestEvents = new LinkedList<TaskRequestEvent>();
public void schedule() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
index 598b1c5..827386b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/FragmentPair.java
@@ -19,7 +19,7 @@
package org.apache.tajo.master;
import com.google.common.base.Objects;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
/**
* FragmentPair consists of two fragments, a left fragment and a right fragment.
@@ -29,23 +29,23 @@ import org.apache.tajo.storage.fragment.FileFragment;
* For other queries, it is assumed to have only a left fragment.
*/
public class FragmentPair {
- private FileFragment leftFragment;
- private FileFragment rightFragment;
+ private Fragment leftFragment;
+ private Fragment rightFragment;
- public FragmentPair(FileFragment left) {
+ public FragmentPair(Fragment left) {
this.leftFragment = left;
}
- public FragmentPair(FileFragment left, FileFragment right) {
+ public FragmentPair(Fragment left, Fragment right) {
this.leftFragment = left;
this.rightFragment = right;
}
- public FileFragment getLeftFragment() {
+ public Fragment getLeftFragment() {
return leftFragment;
}
- public FileFragment getRightFragment() {
+ public Fragment getRightFragment() {
return rightFragment;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 399644c..a9624f8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -40,6 +40,7 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
@@ -336,6 +337,15 @@ public class GlobalEngine extends AbstractService {
responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
}
} else { // it requires distributed execution. So, the query is forwarded to a query master.
+ StoreType storeType = PlannerUtil.getStoreType(plan);
+ if (storeType != null) {
+ StorageManager sm = StorageManager.getStorageManager(context.getConf(), storeType);
+ StorageProperty storageProperty = sm.getStorageProperty();
+ if (!storageProperty.isSupportsInsertInto()) {
+ throw new VerifyException("Inserting into non-file storage is not supported.");
+ }
+ sm.beforeInsertOrCATS(rootNode.getChild());
+ }
context.getSystemMetrics().counter("Query", "numDMLQuery").inc();
hookManager.doHooks(queryContext, plan);
@@ -348,6 +358,7 @@ public class GlobalEngine extends AbstractService {
responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
responseBuilder.setResultCode(ClientProtos.ResultCode.ERROR);
responseBuilder.setErrorMessage("Fail starting QueryMaster.");
+ LOG.error("Fail starting QueryMaster: " + sql);
} else {
responseBuilder.setIsForwarded(true);
responseBuilder.setQueryId(queryInfo.getQueryId().getProto());
@@ -356,7 +367,8 @@ public class GlobalEngine extends AbstractService {
responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost());
}
responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort());
- LOG.info("Query is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
+ LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," +
+ " is forwarded to " + queryInfo.getQueryMasterHost() + ":" + queryInfo.getQueryMasterPort());
}
}
@@ -556,6 +568,7 @@ public class GlobalEngine extends AbstractService {
LOG.info("=============================================");
annotatedPlanVerifier.verify(queryContext, state, plan);
+ verifyInsertTableSchema(queryContext, state, plan);
if (!state.verified()) {
StringBuilder sb = new StringBuilder();
@@ -568,6 +581,25 @@ public class GlobalEngine extends AbstractService {
return plan;
}
+ private void verifyInsertTableSchema(QueryContext queryContext, VerificationState state, LogicalPlan plan) {
+ StoreType storeType = PlannerUtil.getStoreType(plan);
+ if (storeType != null) {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ if (rootNode.getChild().getType() == NodeType.INSERT) {
+ try {
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+ InsertNode iNode = rootNode.getChild();
+ Schema outSchema = iNode.getChild().getOutSchema();
+
+ StorageManager.getStorageManager(queryContext.getConf(), storeType)
+ .verifyInsertTableSchema(tableDesc, outSchema);
+ } catch (Throwable t) {
+ state.addVerification(t.getMessage());
+ }
+ }
+ }
+ }
+
/**
* Alter a given table
*/
@@ -730,32 +762,18 @@ public class GlobalEngine extends AbstractService {
meta = CatalogUtil.newTableMeta(createTable.getStorageType());
}
- if(createTable.isExternal()){
+ if(PlannerUtil.isFileStorageType(createTable.getStorageType()) && createTable.isExternal()){
Preconditions.checkState(createTable.hasPath(), "ERROR: LOCATION must be given.");
- } else {
- String databaseName;
- String tableName;
- if (CatalogUtil.isFQTableName(createTable.getTableName())) {
- databaseName = CatalogUtil.extractQualifier(createTable.getTableName());
- tableName = CatalogUtil.extractSimpleName(createTable.getTableName());
- } else {
- databaseName = queryContext.getCurrentDatabase();
- tableName = createTable.getTableName();
- }
-
- // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
- Path tablePath = StorageUtil.concatPath(sm.getWarehouseDir(), databaseName, tableName);
- createTable.setPath(tablePath);
}
- return createTableOnPath(queryContext, createTable.getTableName(), createTable.getTableSchema(),
- meta, createTable.getPath(), createTable.isExternal(), createTable.getPartitionMethod(), ifNotExists);
+ return createTable(queryContext, createTable.getTableName(), createTable.getStorageType(),
+ createTable.getTableSchema(), meta, createTable.getPath(), createTable.isExternal(),
+ createTable.getPartitionMethod(), ifNotExists);
}
- public TableDesc createTableOnPath(QueryContext queryContext, String tableName, Schema schema, TableMeta meta,
- Path path, boolean isExternal, PartitionMethodDesc partitionDesc,
- boolean ifNotExists)
- throws IOException {
+ public TableDesc createTable(QueryContext queryContext, String tableName, StoreType storeType,
+ Schema schema, TableMeta meta, Path path, boolean isExternal,
+ PartitionMethodDesc partitionDesc, boolean ifNotExists) throws IOException {
String databaseName;
String simpleTableName;
if (CatalogUtil.isFQTableName(tableName)) {
@@ -779,39 +797,15 @@ public class GlobalEngine extends AbstractService {
}
}
- FileSystem fs = path.getFileSystem(context.getConf());
-
- if (isExternal) {
- if(!fs.exists(path)) {
- LOG.error("ERROR: " + path.toUri() + " does not exist");
- throw new IOException("ERROR: " + path.toUri() + " does not exist");
- }
- } else {
- fs.mkdirs(path);
- }
-
- long totalSize = 0;
-
- try {
- totalSize = sm.calculateSize(path);
- } catch (IOException e) {
- LOG.warn("Cannot calculate the size of the relation", e);
- }
-
- TableStats stats = new TableStats();
- stats.setNumBytes(totalSize);
-
- if (isExternal) { // if it is an external table, there is no way to know the exact row number without processing.
- stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
- }
-
TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
- schema, meta, path.toUri(), isExternal);
- desc.setStats(stats);
+ schema, meta, (path != null ? path.toUri(): null), isExternal);
+
if (partitionDesc != null) {
desc.setPartitionMethod(partitionDesc);
}
+ StorageManager.getStorageManager(queryContext.getConf(), storeType).createTable(desc, ifNotExists);
+
if (catalog.createTable(desc)) {
LOG.info("Table " + desc.getName() + " is created (" + desc.getStats().getNumBytes() + ")");
return desc;
@@ -905,13 +899,13 @@ public class GlobalEngine extends AbstractService {
}
}
- Path path = new Path(catalog.getTableDesc(qualifiedName).getPath());
+ TableDesc tableDesc = catalog.getTableDesc(qualifiedName);
catalog.dropTable(qualifiedName);
if (purge) {
try {
- FileSystem fs = path.getFileSystem(context.getConf());
- fs.delete(path, true);
+ StorageManager.getStorageManager(queryContext.getConf(),
+ tableDesc.getMeta().getStoreType()).purgeTable(tableDesc);
} catch (IOException e) {
throw new InternalError(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
index 82fd6fc..56cf8e5 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GreedyFragmentScheduleAlgorithm.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.master.DefaultFragmentScheduleAlgorithm.FragmentsPerDisk;
+import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.TUtil;
@@ -101,9 +102,12 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith
@Override
public void addFragment(FragmentPair fragmentPair) {
String[] hosts = fragmentPair.getLeftFragment().getHosts();
- int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+ int[] diskIds = null;
+ if (fragmentPair.getLeftFragment() instanceof FileFragment) {
+ diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
+ }
for (int i = 0; i < hosts.length; i++) {
- addFragment(hosts[i], diskIds[i], fragmentPair);
+ addFragment(hosts[i], diskIds != null ? diskIds[i] : -1, fragmentPair);
}
totalFragmentNum++;
}
@@ -276,23 +280,27 @@ public class GreedyFragmentScheduleAlgorithm implements FragmentScheduleAlgorith
public void removeFragment(FragmentPair fragmentPair) {
String [] hosts = fragmentPair.getLeftFragment().getHosts();
- int[] diskIds = fragmentPair.getLeftFragment().getDiskIds();
+ int[] diskIds = null;
+ if (fragmentPair.getLeftFragment() instanceof FileFragment) {
+ diskIds = ((FileFragment)fragmentPair.getLeftFragment()).getDiskIds();
+ }
for (int i = 0; i < hosts.length; i++) {
+ int diskId = diskIds == null ? -1 : diskIds[i];
String normalizedHost = NetUtils.normalizeHost(hosts[i]);
Map<Integer, FragmentsPerDisk> diskFragmentMap = fragmentHostMapping.get(normalizedHost);
if (diskFragmentMap != null) {
- FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskIds[i]);
+ FragmentsPerDisk fragmentsPerDisk = diskFragmentMap.get(diskId);
if (fragmentsPerDisk != null) {
boolean isRemoved = fragmentsPerDisk.removeFragmentPair(fragmentPair);
if (isRemoved) {
if (fragmentsPerDisk.size() == 0) {
- diskFragmentMap.remove(diskIds[i]);
+ diskFragmentMap.remove(diskId);
if (diskFragmentMap.size() == 0) {
fragmentHostMapping.remove(normalizedHost);
}
}
- HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskIds[i]);
+ HostAndDisk hostAndDisk = new HostAndDisk(normalizedHost, diskId);
if (totalHostPriority.containsKey(hostAndDisk)) {
PrioritizedHost prioritizedHost = totalHostPriority.get(hostAndDisk);
updateHostPriority(prioritizedHost.hostAndDisk, prioritizedHost.priority-1);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
index b2883cc..cc99453 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.MasterPlan;
@@ -38,7 +39,9 @@ import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.QueryUnitAttempt;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.worker.FetchImpl;
@@ -197,15 +200,17 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
if (event.getType() == EventType.T_SCHEDULE) {
if (event instanceof FragmentScheduleEvent) {
FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event;
- Collection<FileFragment> rightFragments = castEvent.getRightFragments();
+ Collection<Fragment> rightFragments = castEvent.getRightFragments();
if (rightFragments == null || rightFragments.isEmpty()) {
scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), null));
} else {
- for (FileFragment eachFragment: rightFragments) {
+ for (Fragment eachFragment: rightFragments) {
scheduledFragments.addFragment(new FragmentPair(castEvent.getLeftFragment(), eachFragment));
}
}
- initDiskBalancer(castEvent.getLeftFragment().getHosts(), castEvent.getLeftFragment().getDiskIds());
+ if (castEvent.getLeftFragment() instanceof FileFragment) {
+ initDiskBalancer(castEvent.getLeftFragment().getHosts(), ((FileFragment)castEvent.getLeftFragment()).getDiskIds());
+ }
} else if (event instanceof FetchScheduleEvent) {
FetchScheduleEvent castEvent = (FetchScheduleEvent) event;
scheduledFetches.addFetch(castEvent.getFetches());
@@ -366,6 +371,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
long taskSize = adjustTaskSize();
LOG.info("Adjusted task size: " + taskSize);
+ TajoConf conf = subQuery.getContext().getConf();
// host local, disk local
String normalized = NetUtils.normalizeHost(host);
Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID);
@@ -376,13 +382,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
break;
}
- if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+ if (assignedFragmentSize +
+ StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
break;
} else {
fragmentPairs.add(fragmentPair);
- assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+ assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
if (fragmentPair.getRightFragment() != null) {
- assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+ assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
}
}
scheduledFragments.removeFragment(fragmentPair);
@@ -398,13 +405,14 @@ public class LazyTaskScheduler extends AbstractTaskScheduler {
break;
}
- if (assignedFragmentSize + fragmentPair.getLeftFragment().getEndKey() > taskSize) {
+ if (assignedFragmentSize +
+ StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment()) > taskSize) {
break;
} else {
fragmentPairs.add(fragmentPair);
- assignedFragmentSize += fragmentPair.getLeftFragment().getEndKey();
+ assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getLeftFragment());
if (fragmentPair.getRightFragment() != null) {
- assignedFragmentSize += fragmentPair.getRightFragment().getEndKey();
+ assignedFragmentSize += StorageManager.getFragmentLength(conf, fragmentPair.getRightFragment());
}
}
scheduledFragments.removeFragment(fragmentPair);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index 768528d..64081f3 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -26,7 +26,6 @@ import org.apache.tajo.QueryUnitId;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.physical.PhysicalPlanUtil;
import org.apache.tajo.plan.logical.ScanNode;
import org.apache.tajo.engine.planner.physical.SeqScanExec;
import org.apache.tajo.engine.query.QueryContext;
@@ -34,6 +33,8 @@ import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
@@ -41,7 +42,7 @@ import java.util.ArrayList;
import java.util.List;
public class NonForwardQueryResultScanner {
- private static final int MAX_FILE_NUM_PER_SCAN = 100;
+ private static final int MAX_FRAGMENT_NUM_PER_SCAN = 100;
private QueryId queryId;
private String sessionId;
@@ -54,7 +55,7 @@ public class NonForwardQueryResultScanner {
private TajoConf tajoConf;
private ScanNode scanNode;
- private int currentFileIndex = 0;
+ private int currentFragmentIndex = 0;
public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
QueryId queryId,
@@ -76,23 +77,24 @@ public class NonForwardQueryResultScanner {
}
private void initSeqScanExec() throws IOException {
- FragmentProto[] fragments = PhysicalPlanUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
- currentFileIndex, MAX_FILE_NUM_PER_SCAN);
- if (fragments != null && fragments.length > 0) {
+ List<Fragment> fragments = StorageManager.getStorageManager(tajoConf, tableDesc.getMeta().getStoreType())
+ .getNonForwardSplit(tableDesc, currentFragmentIndex, MAX_FRAGMENT_NUM_PER_SCAN);
+
+ if (fragments != null && !fragments.isEmpty()) {
+ FragmentProto[] fragmentProtos = FragmentConvertor.toFragmentProtoArray(fragments.toArray(new Fragment[]{}));
this.taskContext = new TaskAttemptContext(
new QueryContext(tajoConf), null,
new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0),
- fragments, null);
+ fragmentProtos, null);
try {
// scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
- scanExec = new SeqScanExec(taskContext,
- StorageManager.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments);
+ scanExec = new SeqScanExec(taskContext, (ScanNode)scanNode.clone(), fragmentProtos);
} catch (CloneNotSupportedException e) {
throw new IOException(e.getMessage(), e);
}
scanExec.init();
- currentFileIndex += fragments.length;
+ currentFragmentIndex += fragments.size();
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index 604cfe0..f307127 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -53,6 +53,7 @@ import org.apache.tajo.rule.EvaluationContext;
import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
+import org.apache.tajo.storage.FileStorageManager;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.util.*;
import org.apache.tajo.util.history.HistoryReader;
@@ -110,7 +111,7 @@ public class TajoMaster extends CompositeService {
private CatalogServer catalogServer;
private CatalogService catalog;
- private StorageManager storeManager;
+ private FileStorageManager storeManager;
private GlobalEngine globalEngine;
private AsyncDispatcher dispatcher;
private TajoMasterClientService tajoMasterClientService;
@@ -171,7 +172,7 @@ public class TajoMaster extends CompositeService {
// check the system directory and create if they are not created.
checkAndInitializeSystemDirectories();
diagnoseTajoMaster();
- this.storeManager = StorageManager.getStorageManager(systemConf);
+ this.storeManager = (FileStorageManager)StorageManager.getFileStorageManager(systemConf, null);
catalogServer = new CatalogServer(FunctionLoader.load());
addIfService(catalogServer);
@@ -422,7 +423,7 @@ public class TajoMaster extends CompositeService {
return this.catalogServer;
}
- public StorageManager getStorageManager() {
+ public FileStorageManager getStorageManager() {
return this.storeManager;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index b420a65..7014034 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -814,7 +814,8 @@ public class TajoMasterClientService extends AbstractService {
TableDesc desc;
try {
- desc = context.getGlobalEngine().createTableOnPath(queryContext, request.getName(), schema,
+ desc = context.getGlobalEngine().createTable(queryContext, request.getName(),
+ meta.getStoreType(), schema,
meta, path, true, partitionDesc, false);
} catch (Exception e) {
return TableResponse.newBuilder()
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
index 8cc17cb..9a7cc76 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/event/FragmentScheduleEvent.java
@@ -19,23 +19,23 @@
package org.apache.tajo.master.event;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.util.Collection;
public class FragmentScheduleEvent extends TaskSchedulerEvent {
- private final FileFragment leftFragment;
- private final Collection<FileFragment> rightFragments;
+ private final Fragment leftFragment;
+ private final Collection<Fragment> rightFragments;
public FragmentScheduleEvent(final EventType eventType, final ExecutionBlockId blockId,
- final FileFragment fragment) {
+ final Fragment fragment) {
this(eventType, blockId, fragment, null);
}
public FragmentScheduleEvent(final EventType eventType,
final ExecutionBlockId blockId,
- final FileFragment leftFragment,
- final Collection<FileFragment> rightFragments) {
+ final Fragment leftFragment,
+ final Collection<Fragment> rightFragments) {
super(eventType, blockId);
this.leftFragment = leftFragment;
this.rightFragments = rightFragments;
@@ -45,11 +45,11 @@ public class FragmentScheduleEvent extends TaskSchedulerEvent {
return this.rightFragments != null && !this.rightFragments.isEmpty();
}
- public FileFragment getLeftFragment() {
+ public Fragment getLeftFragment() {
return leftFragment;
}
- public Collection<FileFragment> getRightFragments() { return rightFragments; }
+ public Collection<Fragment> getRightFragments() { return rightFragments; }
@Override
public String toString() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index f92001f..a048780 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -23,7 +23,6 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -32,33 +31,28 @@ import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
import org.apache.tajo.engine.planner.global.ExecutionBlockCursor;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.plan.logical.CreateTableNode;
-import org.apache.tajo.plan.logical.InsertNode;
-import org.apache.tajo.plan.logical.NodeType;
+import org.apache.tajo.plan.logical.*;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.master.event.*;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.util.history.SubQueryHistory;
import java.io.IOException;
-import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
@@ -74,7 +68,6 @@ public class Query implements EventHandler<QueryEvent> {
private Map<ExecutionBlockId, SubQuery> subqueries;
private final EventHandler eventHandler;
private final MasterPlan plan;
- private final StorageManager sm;
QueryMasterTask.QueryMasterTaskContext context;
private ExecutionBlockCursor cursor;
@@ -216,7 +209,6 @@ public class Query implements EventHandler<QueryEvent> {
subqueries = Maps.newHashMap();
this.eventHandler = eventHandler;
this.plan = plan;
- this.sm = context.getStorageManager();
this.cursor = new ExecutionBlockCursor(plan, true);
StringBuilder sb = new StringBuilder("\n=======================================================");
@@ -398,7 +390,7 @@ public class Query implements EventHandler<QueryEvent> {
query.setStartTime();
SubQuery subQuery = new SubQuery(query.context, query.getPlan(),
- query.getExecutionBlockCursor().nextBlock(), query.sm);
+ query.getExecutionBlockCursor().nextBlock());
subQuery.setPriority(query.priority--);
query.addSubQuery(subQuery);
@@ -423,6 +415,20 @@ public class Query implements EventHandler<QueryEvent> {
} else {
finalState = QueryState.QUERY_ERROR;
}
+ if (finalState != QueryState.QUERY_SUCCEEDED) {
+ SubQuery lastStage = query.getSubQuery(subQueryEvent.getExecutionBlockId());
+ if (lastStage != null && lastStage.getTableMeta() != null) {
+ StoreType storeType = lastStage.getTableMeta().getStoreType();
+ if (storeType != null) {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ try {
+ StorageManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
+ } catch (IOException e) {
+ LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+ }
+ }
+ }
+ }
query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
query.setFinishTime();
@@ -430,356 +436,27 @@ public class Query implements EventHandler<QueryEvent> {
}
private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
- MasterPlan masterPlan = query.getPlan();
+ SubQuery lastStage = query.getSubQuery(event.getExecutionBlockId());
+ StoreType storeType = lastStage.getTableMeta().getStoreType();
+ try {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
- ExecutionBlock terminal = query.getPlan().getTerminalBlock();
- DataChannel finalChannel = masterPlan.getChannel(event.getExecutionBlockId(), terminal.getId());
+ Path finalOutputDir = StorageManager.getStorageManager(query.systemConf, storeType)
+ .commitOutputData(query.context.getQueryContext(),
+ lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
- QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
- try {
- Path finalOutputDir = commitOutputData(query);
+ QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
- } catch (Throwable t) {
- query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(t)));
+ } catch (Exception e) {
+ query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
return QueryState.QUERY_ERROR;
}
return QueryState.QUERY_SUCCEEDED;
}
- /**
- * It moves a result data stored in a staging output dir into a final output dir.
- */
- public Path commitOutputData(Query query) throws IOException {
- QueryContext queryContext = query.context.getQueryContext();
- Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
- Path finalOutputDir;
- if (queryContext.hasOutputPath()) {
- finalOutputDir = queryContext.getOutputPath();
- try {
- FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
-
- if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO
-
- // It moves the original table into the temporary location.
- // Then it moves the new result table into the original table location.
- // Upon failed, it recovers the original table if possible.
- boolean movedToOldTable = false;
- boolean committed = false;
- Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME);
- ContentSummary summary = fs.getContentSummary(stagingResultDir);
-
- if (queryContext.hasPartition() && summary.getFileCount() > 0L) {
- // This is a map for existing non-leaf directory to rename. A key is current directory and a value is
- // renaming directory.
- Map<Path, Path> renameDirs = TUtil.newHashMap();
- // This is a map for recovering existing partition directory. A key is current directory and a value is
- // temporary directory to back up.
- Map<Path, Path> recoveryDirs = TUtil.newHashMap();
-
- try {
- if (!fs.exists(finalOutputDir)) {
- fs.mkdirs(finalOutputDir);
- }
-
- visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(),
- renameDirs, oldTableDir);
-
- // Rename target partition directories
- for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
- // Backup existing data files for recovering
- if (fs.exists(entry.getValue())) {
- String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(),
- oldTableDir.toString());
- Path recoveryPath = new Path(recoveryPathString);
- fs.rename(entry.getValue(), recoveryPath);
- fs.exists(recoveryPath);
- recoveryDirs.put(entry.getValue(), recoveryPath);
- }
- // Delete existing directory
- fs.delete(entry.getValue(), true);
- // Rename staging directory to final output directory
- fs.rename(entry.getKey(), entry.getValue());
- }
-
- } catch (IOException ioe) {
- // Remove created dirs
- for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) {
- fs.delete(entry.getValue(), true);
- }
-
- // Recovery renamed dirs
- for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) {
- fs.delete(entry.getValue(), true);
- fs.rename(entry.getValue(), entry.getKey());
- }
-
- throw new IOException(ioe.getMessage());
- }
- } else { // no partition
- try {
-
- // if the final output dir exists, move all contents to the temporary table dir.
- // Otherwise, just make the final output dir. As a result, the final output dir will be empty.
- if (fs.exists(finalOutputDir)) {
- fs.mkdirs(oldTableDir);
-
- for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
- fs.rename(status.getPath(), oldTableDir);
- }
-
- movedToOldTable = fs.exists(oldTableDir);
- } else { // if the parent does not exist, make its parent directory.
- fs.mkdirs(finalOutputDir);
- }
-
- // Move the results to the final output dir.
- for (FileStatus status : fs.listStatus(stagingResultDir)) {
- fs.rename(status.getPath(), finalOutputDir);
- }
-
- // Check the final output dir
- committed = fs.exists(finalOutputDir);
-
- } catch (IOException ioe) {
- // recover the old table
- if (movedToOldTable && !committed) {
-
- // if commit is failed, recover the old data
- for (FileStatus status : fs.listStatus(finalOutputDir, StorageManager.hiddenFileFilter)) {
- fs.delete(status.getPath(), true);
- }
-
- for (FileStatus status : fs.listStatus(oldTableDir)) {
- fs.rename(status.getPath(), finalOutputDir);
- }
- }
-
- throw new IOException(ioe.getMessage());
- }
- }
- } else {
- NodeType queryType = queryContext.getCommandType();
-
- if (queryType == NodeType.INSERT) { // INSERT INTO an existing table
-
- NumberFormat fmt = NumberFormat.getInstance();
- fmt.setGroupingUsed(false);
- fmt.setMinimumIntegerDigits(3);
-
- if (queryContext.hasPartition()) {
- for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
- if (eachFile.isFile()) {
- LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath());
- continue;
- }
- moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1);
- }
- } else {
- int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1;
- for(FileStatus eachFile: fs.listStatus(stagingResultDir)) {
- moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++);
- }
- }
- // checking all file moved and remove empty dir
- verifyAllFileMoved(fs, stagingResultDir);
- FileStatus[] files = fs.listStatus(stagingResultDir);
- if (files != null && files.length != 0) {
- for (FileStatus eachFile: files) {
- LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
- }
- }
- } else { // CREATE TABLE AS SELECT (CTAS)
- if (fs.exists(finalOutputDir)) {
- for (FileStatus status : fs.listStatus(stagingResultDir)) {
- fs.rename(status.getPath(), finalOutputDir);
- }
- } else {
- fs.rename(stagingResultDir, finalOutputDir);
- }
- LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
- }
- }
-
- // remove the staging directory if the final output dir is given.
- Path stagingDirRoot = queryContext.getStagingDir().getParent();
- fs.delete(stagingDirRoot, true);
-
- } catch (Throwable t) {
- LOG.error(t);
- throw new IOException(t);
- }
- } else {
- finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
- }
-
- return finalOutputDir;
- }
-
- /**
- * This method sets a rename map which includes renamed staging directory to final output directory recursively.
- * If there exists some data files, this delete it for duplicate data.
- *
- *
- * @param fs
- * @param stagingPath
- * @param outputPath
- * @param stagingParentPathString
- * @throws IOException
- */
- private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath,
- String stagingParentPathString,
- Map<Path, Path> renameDirs, Path oldTableDir) throws IOException {
- FileStatus[] files = fs.listStatus(stagingPath);
-
- for(FileStatus eachFile : files) {
- if (eachFile.isDirectory()) {
- Path oldPath = eachFile.getPath();
-
- // Make recover directory.
- String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString,
- oldTableDir.toString());
- Path recoveryPath = new Path(recoverPathString);
- if (!fs.exists(recoveryPath)) {
- fs.mkdirs(recoveryPath);
- }
-
- visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString,
- renameDirs, oldTableDir);
- // Find last order partition for renaming
- String newPathString = oldPath.toString().replaceAll(stagingParentPathString,
- outputPath.toString());
- Path newPath = new Path(newPathString);
- if (!isLeafDirectory(fs, eachFile.getPath())) {
- renameDirs.put(eachFile.getPath(), newPath);
- } else {
- if (!fs.exists(newPath)) {
- fs.mkdirs(newPath);
- }
- }
- }
- }
- }
-
- private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException {
- boolean retValue = false;
-
- FileStatus[] files = fs.listStatus(path);
- for (FileStatus file : files) {
- if (fs.isDirectory(file.getPath())) {
- retValue = true;
- break;
- }
- }
-
- return retValue;
- }
-
- private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException {
- FileStatus[] files = fs.listStatus(stagingPath);
- if (files != null && files.length != 0) {
- for (FileStatus eachFile: files) {
- if (eachFile.isFile()) {
- LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath());
- return false;
- } else {
- if (verifyAllFileMoved(fs, eachFile.getPath())) {
- fs.delete(eachFile.getPath(), false);
- } else {
- return false;
- }
- }
- }
- }
-
- return true;
- }
-
- /**
- * Attach the sequence number to a path.
- *
- * @param path Path
- * @param seq sequence number
- * @param nf Number format
- * @return New path attached with sequence number
- * @throws IOException
- */
- private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException {
- String[] tokens = path.getName().split("-");
- if (tokens.length != 4) {
- throw new IOException("Wrong result file name:" + path);
- }
- return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq);
- }
-
- /**
- * Attach the sequence number to the output file name and than move the file into the final result path.
- *
- * @param fs FileSystem
- * @param stagingResultDir The staging result dir
- * @param fileStatus The file status
- * @param finalOutputPath Final output path
- * @param nf Number format
- * @param fileSeq The sequence number
- * @throws IOException
- */
- private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir,
- FileStatus fileStatus, Path finalOutputPath,
- NumberFormat nf,
- int fileSeq) throws IOException {
- if (fileStatus.isDirectory()) {
- String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
- if (subPath != null) {
- Path finalSubPath = new Path(finalOutputPath, subPath);
- if (!fs.exists(finalSubPath)) {
- fs.mkdirs(finalSubPath);
- }
- int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false);
- for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) {
- moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq);
- }
- } else {
- throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath());
- }
- } else {
- String subPath = extractSubPath(stagingResultDir, fileStatus.getPath());
- if (subPath != null) {
- Path finalSubPath = new Path(finalOutputPath, subPath);
- finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf));
- if (!fs.exists(finalSubPath.getParent())) {
- fs.mkdirs(finalSubPath.getParent());
- }
- if (fs.exists(finalSubPath)) {
- throw new IOException("Already exists data file:" + finalSubPath);
- }
- boolean success = fs.rename(fileStatus.getPath(), finalSubPath);
- if (success) {
- LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " +
- "to final output[" + finalSubPath + "]");
- } else {
- LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " +
- "to final output[" + finalSubPath + "]");
- }
- }
- }
- }
-
- private String extractSubPath(Path parentPath, Path childPath) {
- String parentPathStr = parentPath.toUri().getPath();
- String childPathStr = childPath.toUri().getPath();
-
- if (parentPathStr.length() > childPathStr.length()) {
- return null;
- }
-
- int index = childPathStr.indexOf(parentPathStr);
- if (index != 0) {
- return null;
- }
-
- return childPathStr.substring(parentPathStr.length() + 1);
- }
-
private static interface QueryHook {
boolean isEligible(QueryContext queryContext, Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir);
void execute(QueryMaster.QueryMasterContext context, QueryContext queryContext, Query query,
@@ -947,7 +624,7 @@ public class Query implements EventHandler<QueryEvent> {
private void executeNextBlock(Query query) {
ExecutionBlockCursor cursor = query.getExecutionBlockCursor();
ExecutionBlock nextBlock = cursor.nextBlock();
- SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock, query.sm);
+ SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock);
nextSubQuery.setPriority(query.priority--);
query.addSubQuery(nextSubQuery);
nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT));
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index f2d8b3a..42fac3a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -41,7 +41,6 @@ import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.StorageManager;
import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.NetUtils;
import org.apache.tajo.util.history.QueryHistory;
@@ -71,8 +70,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
private GlobalPlanner globalPlanner;
- private StorageManager storageManager;
-
private TajoConf systemConf;
private Map<QueryId, QueryMasterTask> queryMasterTasks = Maps.newConcurrentMap();
@@ -116,8 +113,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
addIfService(dispatcher);
- this.storageManager = StorageManager.getStorageManager(systemConf);
-
globalPlanner = new GlobalPlanner(systemConf, workerContext);
dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
@@ -373,10 +368,6 @@ public class QueryMaster extends CompositeService implements EventHandler {
return clock;
}
- public StorageManager getStorageManager() {
- return storageManager;
- }
-
public QueryMaster getQueryMaster() {
return QueryMaster.this;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 75d8ab6..1eaef0f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -35,10 +35,13 @@ import org.apache.tajo.algebra.Expr;
import org.apache.tajo.algebra.JsonHelper;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.plan.LogicalOptimizer;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
+import org.apache.tajo.plan.logical.LogicalRootNode;
+import org.apache.tajo.plan.rewrite.RewriteRule;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.plan.logical.LogicalNode;
@@ -54,10 +57,12 @@ import org.apache.tajo.master.TajoContainerProxy;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.master.session.Session;
+import org.apache.tajo.plan.verifier.VerifyException;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.RpcConnectionPool;
import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.StorageProperty;
import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.metrics.TajoMetrics;
@@ -348,6 +353,8 @@ public class QueryMasterTask extends CompositeService {
}
public synchronized void startQuery() {
+ StorageManager sm = null;
+ LogicalPlan plan = null;
try {
if (query != null) {
LOG.warn("Query already started");
@@ -358,7 +365,29 @@ public class QueryMasterTask extends CompositeService {
LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
- LogicalPlan plan = planner.createPlan(queryContext, expr);
+ plan = planner.createPlan(queryContext, expr);
+
+ StoreType storeType = PlannerUtil.getStoreType(plan);
+ if (storeType != null) {
+ sm = StorageManager.getStorageManager(systemConf, storeType);
+ StorageProperty storageProperty = sm.getStorageProperty();
+ if (storageProperty.isSortedInsert()) {
+ String tableName = PlannerUtil.getStoreTableName(plan);
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+ if (tableDesc == null) {
+ throw new VerifyException("Can't get table meta data from catalog: " + tableName);
+ }
+ List<RewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
+ getQueryTaskContext().getQueryContext(), tableDesc);
+ if (storageSpecifiedRewriteRules != null) {
+ for (RewriteRule eachRule: storageSpecifiedRewriteRules) {
+ optimizer.addRuleAfterToJoinOpt(eachRule);
+ }
+ }
+ }
+ }
+
optimizer.optimize(queryContext, plan);
GlobalEngine.DistributedQueryHookManager hookManager = new GlobalEngine.DistributedQueryHookManager();
@@ -393,6 +422,15 @@ public class QueryMasterTask extends CompositeService {
} catch (Throwable t) {
LOG.error(t.getMessage(), t);
initError = t;
+
+ if (plan != null && sm != null) {
+ LogicalRootNode rootNode = plan.getRootBlock().getRoot();
+ try {
+ sm.rollbackOutputCommit(rootNode.getChild());
+ } catch (IOException e) {
+ LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+ }
+ }
}
}
@@ -441,8 +479,14 @@ public class QueryMasterTask extends CompositeService {
// Create Output Directory
////////////////////////////////////////////
+ String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
if (context.isCreateTable() || context.isInsert()) {
- stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+ if (outputPath == null || outputPath.isEmpty()) {
+ // hbase
+ stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+ } else {
+ stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+ }
} else {
stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
}
@@ -570,10 +614,6 @@ public class QueryMasterTask extends CompositeService {
return queryId;
}
- public StorageManager getStorageManager() {
- return queryMasterContext.getStorageManager();
- }
-
public Path getStagingDir() {
return queryContext.getStagingDir();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
index 0f275e9..75402c2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryUnit.java
@@ -43,6 +43,7 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.DataLocation;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.fragment.FragmentConvertor;
import org.apache.tajo.util.Pair;
import org.apache.tajo.util.TajoIdUtils;
@@ -265,8 +266,13 @@ public class QueryUnit implements EventHandler<TaskEvent> {
List<String> fragmentList = new ArrayList<String>();
for (FragmentProto eachFragment : getAllFragments()) {
- FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment);
- fragmentList.add(fileFragment.toString());
+ try {
+ Fragment fragment = FragmentConvertor.convert(systemConf, eachFragment);
+ fragmentList.add(fragment.toString());
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ fragmentList.add("ERROR: " + eachFragment.getStoreType() + "," + eachFragment.getId() + ": " + e.getMessage());
+ }
}
queryUnitHistory.setFragments(fragmentList.toArray(new String[]{}));
@@ -313,15 +319,18 @@ public class QueryUnit implements EventHandler<TaskEvent> {
}
}
- private void addDataLocation(FileFragment fragment) {
+ private void addDataLocation(Fragment fragment) {
String[] hosts = fragment.getHosts();
- int[] diskIds = fragment.getDiskIds();
+ int[] diskIds = null;
+ if (fragment instanceof FileFragment) {
+ diskIds = ((FileFragment)fragment).getDiskIds();
+ }
for (int i = 0; i < hosts.length; i++) {
- dataLocations.add(new DataLocation(hosts[i], diskIds[i]));
+ dataLocations.add(new DataLocation(hosts[i], diskIds == null ? -1 : diskIds[i]));
}
}
- public void addFragment(FileFragment fragment, boolean useDataLocation) {
+ public void addFragment(Fragment fragment, boolean useDataLocation) {
Set<FragmentProto> fragmentProtos;
if (fragMap.containsKey(fragment.getTableName())) {
fragmentProtos = fragMap.get(fragment.getTableName());
@@ -336,8 +345,8 @@ public class QueryUnit implements EventHandler<TaskEvent> {
totalFragmentNum++;
}
- public void addFragments(Collection<FileFragment> fragments) {
- for (FileFragment eachFragment: fragments) {
+ public void addFragments(Collection<Fragment> fragments) {
+ for (Fragment eachFragment: fragments) {
addFragment(eachFragment, false);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 55b1895..a240ace 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -45,15 +45,18 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg
import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
import org.apache.tajo.master.TaskSchedulerContext;
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
+import org.apache.tajo.plan.logical.SortNode.SortPurpose;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.*;
-import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.FileStorageManager;
import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.storage.RowStoreUtil;
import org.apache.tajo.storage.TupleRange;
import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.Pair;
+import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.worker.FetchImpl;
@@ -83,18 +86,20 @@ public class Repartitioner {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock execBlock = subQuery.getBlock();
QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
- StorageManager storageManager = subQuery.getStorageManager();
ScanNode[] scans = execBlock.getScanNodes();
Path tablePath;
- FileFragment[] fragments = new FileFragment[scans.length];
+ Fragment[] fragments = new Fragment[scans.length];
long[] stats = new long[scans.length];
// initialize variables from the child operators
for (int i = 0; i < scans.length; i++) {
TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
if (tableDesc == null) { // if it is a real table stored on storage
+ FileStorageManager storageManager =
+ (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+
tablePath = storageManager.getTablePath(scans[i].getTableName());
if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) {
@@ -107,21 +112,23 @@ public class Repartitioner {
}
fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
} else {
- tablePath = new Path(tableDesc.getPath());
try {
stats[i] = GlobalPlanner.computeDescendentVolume(scans[i]);
} catch (PlanningException e) {
throw new IOException(e);
}
+ StorageManager storageManager =
+ StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType());
+
// if table has no data, storageManager will return empty FileFragment.
// So, we need to handle FileFragment by its size.
// If we don't check its size, it can cause IndexOutOfBoundsException.
- List<FileFragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc.getMeta(), tableDesc.getSchema(), tablePath);
+ List<Fragment> fileFragments = storageManager.getSplits(scans[i].getCanonicalName(), tableDesc);
if (fileFragments.size() > 0) {
fragments[i] = fileFragments.get(0);
} else {
- fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+ fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST});
}
}
}
@@ -268,14 +275,14 @@ public class Repartitioner {
//select intermediate scan and stats
ScanNode[] intermediateScans = new ScanNode[largeScanIndexList.size()];
long[] intermediateScanStats = new long[largeScanIndexList.size()];
- FileFragment[] intermediateFragments = new FileFragment[largeScanIndexList.size()];
+ Fragment[] intermediateFragments = new Fragment[largeScanIndexList.size()];
int index = 0;
for (Integer eachIdx : largeScanIndexList) {
intermediateScans[index] = scans[eachIdx];
intermediateScanStats[index] = stats[eachIdx];
intermediateFragments[index++] = fragments[eachIdx];
}
- FileFragment[] broadcastFragments = new FileFragment[broadcastIndexList.size()];
+ Fragment[] broadcastFragments = new Fragment[broadcastIndexList.size()];
ScanNode[] broadcastScans = new ScanNode[broadcastIndexList.size()];
index = 0;
for (Integer eachIdx : broadcastIndexList) {
@@ -309,9 +316,9 @@ public class Repartitioner {
SubQuery subQuery,
ScanNode[] scans,
long[] stats,
- FileFragment[] fragments,
+ Fragment[] fragments,
ScanNode[] broadcastScans,
- FileFragment[] broadcastFragments) throws IOException {
+ Fragment[] broadcastFragments) throws IOException {
MasterPlan masterPlan = subQuery.getMasterPlan();
ExecutionBlock execBlock = subQuery.getBlock();
// The hash map is modeling as follows:
@@ -394,7 +401,7 @@ public class Repartitioner {
int joinTaskNum = Math.min(maxTaskNum, hashEntries.size());
LOG.info("The determined number of join tasks is " + joinTaskNum);
- List<FileFragment> rightFragments = new ArrayList<FileFragment>();
+ List<Fragment> rightFragments = new ArrayList<Fragment>();
rightFragments.add(fragments[1]);
if (broadcastFragments != null) {
@@ -404,14 +411,19 @@ public class Repartitioner {
Path[] partitionScanPaths = null;
TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
+ FileStorageManager storageManager =
+ (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
- getFragmentsFromPartitionedTable(subQuery.getStorageManager(), eachScan, tableDesc);
+ getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc);
partitionScan.setInputPaths(partitionScanPaths);
} else {
- Collection<FileFragment> scanFragments = subQuery.getStorageManager().getSplits(eachScan.getCanonicalName(),
- tableDesc.getMeta(), tableDesc.getSchema(), new Path(tableDesc.getPath()));
+ StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(),
+ tableDesc.getMeta().getStoreType());
+ Collection<Fragment> scanFragments = storageManager.getSplits(eachScan.getCanonicalName(),
+ tableDesc, eachScan);
if (scanFragments != null) {
rightFragments.addAll(scanFragments);
}
@@ -480,10 +492,10 @@ public class Repartitioner {
/**
* It creates a number of fragments for all partitions.
*/
- public static List<FileFragment> getFragmentsFromPartitionedTable(StorageManager sm,
+ public static List<Fragment> getFragmentsFromPartitionedTable(FileStorageManager sm,
ScanNode scan,
TableDesc table) throws IOException {
- List<FileFragment> fragments = Lists.newArrayList();
+ List<Fragment> fragments = Lists.newArrayList();
PartitionedTableScanNode partitionsScan = (PartitionedTableScanNode) scan;
fragments.addAll(sm.getSplits(
scan.getCanonicalName(), table.getMeta(), table.getSchema(), partitionsScan.getInputPaths()));
@@ -492,7 +504,7 @@ public class Repartitioner {
}
private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery,
- int baseScanId, FileFragment[] fragments) throws IOException {
+ int baseScanId, Fragment[] fragments) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode[] scans = execBlock.getScanNodes();
@@ -511,23 +523,27 @@ public class Repartitioner {
// . add all partition paths to node's inputPaths variable
// -> SCAN
// . add all fragments to broadcastFragments
- Collection<FileFragment> baseFragments = null;
- List<FileFragment> broadcastFragments = new ArrayList<FileFragment>();
+ Collection<Fragment> baseFragments = null;
+ List<Fragment> broadcastFragments = new ArrayList<Fragment>();
for (int i = 0; i < scans.length; i++) {
ScanNode scan = scans[i];
TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName());
TableMeta meta = desc.getMeta();
- Collection<FileFragment> scanFragments;
+ Collection<Fragment> scanFragments;
Path[] partitionScanPaths = null;
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
- scanFragments = getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, desc);
+ FileStorageManager storageManager =
+ (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+ scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc);
} else {
- scanFragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, desc.getSchema(),
- new Path(desc.getPath()));
+ StorageManager storageManager =
+ StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType());
+
+ scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan);
}
if (scanFragments != null) {
@@ -630,46 +646,66 @@ public class Repartitioner {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode scan = execBlock.getScanNodes()[0];
Path tablePath;
- tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
+ tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+ .getTablePath(scan.getTableName());
ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0);
SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
SortSpec [] sortSpecs = sortNode.getSortKeys();
Schema sortSchema = new Schema(channel.getShuffleKeys());
+ TupleRange[] ranges;
+ int determinedTaskNum;
+
// calculate the number of maximum query ranges
TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId());
// If there is an empty table in inner join, it should return zero rows.
- if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0 ) {
+ if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) {
return;
}
TupleRange mergedRange = TupleUtil.columnStatToRange(sortSpecs, sortSchema, totalStat.getColumnStats(), false);
- RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
- BigInteger card = partitioner.getTotalCardinality();
- // if the number of the range cardinality is less than the desired number of tasks,
- // we set the the number of tasks to the number of range cardinality.
- int determinedTaskNum;
- if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
- LOG.info(subQuery.getId() + ", The range cardinality (" + card
- + ") is less then the desired number of tasks (" + maxNum + ")");
- determinedTaskNum = card.intValue();
+ if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) {
+ StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+ CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
+ LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot();
+ TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
+ if (tableDesc == null) {
+ throw new IOException("Can't get table meta data from catalog: " +
+ PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
+ }
+ ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType)
+ .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc,
+ sortNode.getInSchema(), sortSpecs,
+ mergedRange);
+ determinedTaskNum = ranges.length;
} else {
- determinedTaskNum = maxNum;
- }
+ RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
+ BigInteger card = partitioner.getTotalCardinality();
+
+ // if the number of the range cardinality is less than the desired number of tasks,
+ // we set the the number of tasks to the number of range cardinality.
+ if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) {
+ LOG.info(subQuery.getId() + ", The range cardinality (" + card
+ + ") is less then the desired number of tasks (" + maxNum + ")");
+ determinedTaskNum = card.intValue();
+ } else {
+ determinedTaskNum = maxNum;
+ }
- LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
- " sub ranges (total units: " + determinedTaskNum + ")");
- TupleRange [] ranges = partitioner.partition(determinedTaskNum);
- if (ranges == null || ranges.length == 0) {
- LOG.warn(subQuery.getId() + " no range infos.");
- }
- TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
- if (LOG.isDebugEnabled()) {
- if (ranges != null) {
- for (TupleRange eachRange : ranges) {
- LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+ LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum +
+ " sub ranges (total units: " + determinedTaskNum + ")");
+ ranges = partitioner.partition(determinedTaskNum);
+ if (ranges == null || ranges.length == 0) {
+ LOG.warn(subQuery.getId() + " no range infos.");
+ }
+ TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges);
+ if (LOG.isDebugEnabled()) {
+ if (ranges != null) {
+ for (TupleRange eachRange : ranges) {
+ LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd());
+ }
}
}
}
@@ -772,14 +808,15 @@ public class Repartitioner {
public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan,
SubQuery subQuery, DataChannel channel,
- int maxNum) {
+ int maxNum) throws IOException {
ExecutionBlock execBlock = subQuery.getBlock();
ScanNode scan = execBlock.getScanNodes()[0];
Path tablePath;
- tablePath = subQuery.getContext().getStorageManager().getTablePath(scan.getTableName());
+ tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()))
+ .getTablePath(scan.getTableName());
- FileFragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
- List<FileFragment> fragments = new ArrayList<FileFragment>();
+ Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+ List<Fragment> fragments = new ArrayList<Fragment>();
fragments.add(frag);
SubQuery.scheduleFragments(subQuery, fragments);
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 39bb7ed..7f05fa4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -36,7 +36,7 @@ import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.StatisticsUtil;
import org.apache.tajo.catalog.statistics.TableStats;
@@ -59,10 +59,11 @@ import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttem
import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
import org.apache.tajo.master.container.TajoContainer;
import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.storage.FileStorageManager;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.history.QueryUnitHistory;
@@ -96,7 +97,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private TableStats resultStatistics;
private TableStats inputStatistics;
private EventHandler<Event> eventHandler;
- private final StorageManager sm;
private AbstractTaskScheduler taskScheduler;
private QueryMasterTask.QueryMasterTaskContext context;
private final List<String> diagnostics = new ArrayList<String>();
@@ -286,12 +286,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private AtomicInteger completeReportReceived = new AtomicInteger(0);
private SubQueryHistory finalSubQueryHistory;
- public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan,
- ExecutionBlock block, StorageManager sm) {
+ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) {
this.context = context;
this.masterPlan = masterPlan;
this.block = block;
- this.sm = sm;
this.eventHandler = context.getEventHandler();
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
@@ -509,10 +507,6 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return this.priority;
}
- public StorageManager getStorageManager() {
- return sm;
- }
-
public ExecutionBlockId getId() {
return block.getId();
}
@@ -677,14 +671,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
DataChannel channel = masterPlan.getOutgoingChannels(getId()).get(0);
- // get default or store type
- CatalogProtos.StoreType storeType = CatalogProtos.StoreType.CSV; // default setting
// if store plan (i.e., CREATE or INSERT OVERWRITE)
- StoreTableNode storeTableNode = PlannerUtil.findTopNode(getBlock().getPlan(), NodeType.STORE);
- if (storeTableNode != null) {
- storeType = storeTableNode.getStorageType();
+ StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan());
+ if (storeType == null) {
+ // get default or store type
+ storeType = StoreType.CSV;
}
+
schema = channel.getSchema();
meta = CatalogUtil.newTableMeta(storeType, new KeyValueSet());
inputStatistics = statsArray[0];
@@ -1043,7 +1037,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
ScanNode scan = scans[0];
TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName());
- Collection<FileFragment> fragments;
+ Collection<Fragment> fragments;
TableMeta meta = table.getMeta();
// Depending on scanner node's type, it creates fragments. If scan is for
@@ -1052,10 +1046,13 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
// span a number of blocks or possibly consists of a number of files.
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
// After calling this method, partition paths are removed from the physical plan.
- fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table);
+ FileStorageManager storageManager =
+ (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf());
+ fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
} else {
- Path inputPath = new Path(table.getPath());
- fragments = subQuery.getStorageManager().getSplits(scan.getCanonicalName(), meta, table.getSchema(), inputPath);
+ StorageManager storageManager =
+ StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType());
+ fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan);
}
SubQuery.scheduleFragments(subQuery, fragments);
@@ -1073,27 +1070,27 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
}
}
- public static void scheduleFragment(SubQuery subQuery, FileFragment fragment) {
+ public static void scheduleFragment(SubQuery subQuery, Fragment fragment) {
subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
subQuery.getId(), fragment));
}
- public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> fragments) {
- for (FileFragment eachFragment : fragments) {
+ public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> fragments) {
+ for (Fragment eachFragment : fragments) {
scheduleFragment(subQuery, eachFragment);
}
}
- public static void scheduleFragments(SubQuery subQuery, Collection<FileFragment> leftFragments,
- Collection<FileFragment> broadcastFragments) {
- for (FileFragment eachLeafFragment : leftFragments) {
+ public static void scheduleFragments(SubQuery subQuery, Collection<Fragment> leftFragments,
+ Collection<Fragment> broadcastFragments) {
+ for (Fragment eachLeafFragment : leftFragments) {
scheduleFragment(subQuery, eachLeafFragment, broadcastFragments);
}
}
public static void scheduleFragment(SubQuery subQuery,
- FileFragment leftFragment, Collection<FileFragment> rightFragments) {
+ Fragment leftFragment, Collection<Fragment> rightFragments) {
subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE,
subQuery.getId(), leftFragment, rightFragments));
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 0cc87fc..f1a9224 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -39,6 +39,7 @@ import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.util.ApplicationIdUtils;
+import org.apache.tajo.util.StringUtils;
import java.io.IOException;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
index 0de1b2b..3147bb6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/IndexUtil.java
@@ -38,7 +38,7 @@ public class IndexUtil {
public static String getIndexNameOfFrag(FileFragment fragment, SortSpec[] keys) {
StringBuilder builder = new StringBuilder();
builder.append(fragment.getPath().getName() + "_");
- builder.append(fragment.getStartKey() + "_" + fragment.getEndKey() + "_");
+ builder.append(fragment.getStartKey() + "_" + fragment.getLength() + "_");
for(int i = 0 ; i < keys.length ; i ++) {
builder.append(keys[i].getSortKey().getSimpleName()+"_");
}