You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/25 10:08:59 UTC
[07/15] tajo git commit: TAJO-1616: Implement TablespaceManager to
load Tablespaces.
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
index 5e3d0b6..d490001 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/CreateTableHook.java
@@ -19,13 +19,13 @@
package org.apache.tajo.master.exec.prehook;
import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.logical.CreateTableNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
-import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
public class CreateTableHook implements DistributedQueryHook {
@@ -43,8 +43,10 @@ public class CreateTableHook implements DistributedQueryHook {
String databaseName = splitted[0];
String tableName = splitted[1];
queryContext.setOutputTable(tableName);
- queryContext.setOutputPath(
- StorageUtil.concatPath(TajoConf.getWarehouseDir(queryContext.getConf()), databaseName, tableName));
+
+ // set the final output table uri
+ queryContext.setOutputPath(createTableNode.getUri());
+
if(createTableNode.getPartitionMethod() != null) {
queryContext.setPartitionMethod(createTableNode.getPartitionMethod());
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
index 3dba176..f403092 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/DistributedQueryHookManager.java
@@ -37,7 +37,7 @@ public class DistributedQueryHookManager {
try {
hook.hook(queryContext, plan);
} catch (Throwable t) {
- t.printStackTrace();
+ throw new RuntimeException(t);
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
index 14c4d8d..c309f57 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/prehook/InsertIntoHook.java
@@ -39,23 +39,21 @@ public class InsertIntoHook implements DistributedQueryHook {
// Set QueryContext settings, such as output table name and output path.
// It also remove data files if overwrite is true.
- Path outputPath;
if (insertNode.hasTargetTable()) { // INSERT INTO [TB_NAME]
queryContext.setOutputTable(insertNode.getTableName());
- queryContext.setOutputPath(insertNode.getPath());
if (insertNode.hasPartition()) {
queryContext.setPartitionMethod(insertNode.getPartitionMethod());
}
} else { // INSERT INTO LOCATION ...
// When INSERT INTO LOCATION, must not set output table.
- outputPath = insertNode.getPath();
queryContext.setFileOutput();
- queryContext.setOutputPath(outputPath);
}
+ // Set the final output table uri
+ queryContext.setOutputPath(insertNode.getUri());
+
if (insertNode.isOverwrite()) {
queryContext.setOutputOverwrite();
}
-
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
index 23808b5..4fef02c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.state.*;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryVars;
import org.apache.tajo.SessionVars;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
@@ -48,6 +49,7 @@ import org.apache.tajo.master.event.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.StorageConstants;
import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.util.history.QueryHistory;
import org.apache.tajo.util.history.StageHistory;
@@ -427,40 +429,59 @@ public class Query implements EventHandler<QueryEvent> {
} else {
finalState = QueryState.QUERY_ERROR;
}
+
+ // When a query is failed
if (finalState != QueryState.QUERY_SUCCEEDED) {
Stage lastStage = query.getStage(stageEvent.getExecutionBlockId());
- if (lastStage != null && lastStage.getTableMeta() != null) {
- String storeType = lastStage.getTableMeta().getStoreType();
- if (storeType != null) {
- LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
- try {
- TableSpaceManager.getStorageManager(query.systemConf, storeType).rollbackOutputCommit(rootNode.getChild());
- } catch (IOException e) {
- LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
- }
- }
- }
+ handleQueryFailure(query, lastStage);
}
+
query.eventHandler.handle(new QueryMasterQueryCompletedEvent(query.getId()));
query.setFinishTime();
return finalState;
}
+ // handle query failures
+ private void handleQueryFailure(Query query, Stage lastStage) {
+ QueryContext context = query.context.getQueryContext();
+
+ if (lastStage != null && context.hasOutputTableUri()) {
+ Tablespace space = TableSpaceManager.get(context.getOutputTableUri()).get();
+ try {
+ LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
+ space.rollbackTable(rootNode.getChild());
+ } catch (IOException e) {
+ LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
+ }
+ }
+ }
+
private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
Stage lastStage = query.getStage(event.getExecutionBlockId());
- String storeType = lastStage.getTableMeta().getStoreType();
+
try {
+
LogicalRootNode rootNode = lastStage.getMasterPlan().getLogicalPlan().getRootBlock().getRoot();
CatalogService catalog = lastStage.getContext().getQueryMasterContext().getWorkerContext().getCatalog();
TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
- Path finalOutputDir = TableSpaceManager.getStorageManager(query.systemConf, storeType)
- .commitOutputData(query.context.getQueryContext(),
- lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), lastStage.getSchema(), tableDesc);
+ QueryContext queryContext = query.context.getQueryContext();
+
+ // If there is not tabledesc, it is a select query without insert or ctas.
+ // In this case, we should use default tablespace.
+ Tablespace space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+
+ Path finalOutputDir = space.commitTable(
+ query.context.getQueryContext(),
+ lastStage.getId(),
+ lastStage.getMasterPlan().getLogicalPlan(),
+ lastStage.getSchema(),
+ tableDesc);
QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);
+
} catch (Exception e) {
query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
return QueryState.QUERY_ERROR;
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
index 2809a70..84f2eac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java
@@ -18,6 +18,7 @@
package org.apache.tajo.querymaster;
+import com.google.common.base.Optional;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -51,14 +52,9 @@ import org.apache.tajo.plan.logical.LogicalNode;
import org.apache.tajo.plan.logical.LogicalRootNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule;
import org.apache.tajo.plan.util.PlannerUtil;
-import org.apache.tajo.plan.verifier.VerifyException;
import org.apache.tajo.session.Session;
-import org.apache.tajo.storage.Tablespace;
-import org.apache.tajo.storage.StorageProperty;
-import org.apache.tajo.storage.StorageUtil;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.*;
import org.apache.tajo.util.metrics.TajoMetrics;
import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
import org.apache.tajo.worker.AbstractResourceAllocator;
@@ -308,43 +304,28 @@ public class QueryMasterTask extends CompositeService {
}
public synchronized void startQuery() {
- Tablespace sm = null;
LogicalPlan plan = null;
+ Tablespace space = null;
try {
if (query != null) {
LOG.warn("Query already started");
return;
}
+
+
CatalogService catalog = getQueryTaskContext().getQueryMasterContext().getWorkerContext().getCatalog();
- LogicalPlanner planner = new LogicalPlanner(catalog);
+ LogicalPlanner planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
LogicalOptimizer optimizer = new LogicalOptimizer(systemConf);
Expr expr = JsonHelper.fromJson(jsonExpr, Expr.class);
jsonExpr = null; // remove the possible OOM
- plan = planner.createPlan(queryContext, expr);
-
- String storeType = PlannerUtil.getStoreType(plan);
- if (storeType != null) {
- sm = TableSpaceManager.getStorageManager(systemConf, storeType);
- StorageProperty storageProperty = sm.getStorageProperty();
- if (storageProperty.isSortedInsert()) {
- String tableName = PlannerUtil.getStoreTableName(plan);
- LogicalRootNode rootNode = plan.getRootBlock().getRoot();
- TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild());
- if (tableDesc == null) {
- throw new VerifyException("Can't get table meta data from catalog: " + tableName);
- }
- List<LogicalPlanRewriteRule> storageSpecifiedRewriteRules = sm.getRewriteRules(
- getQueryTaskContext().getQueryContext(), tableDesc);
- if (storageSpecifiedRewriteRules != null) {
- for (LogicalPlanRewriteRule eachRule: storageSpecifiedRewriteRules) {
- optimizer.addRuleAfterToJoinOpt(eachRule);
- }
- }
- }
- }
+ plan = planner.createPlan(queryContext, expr);
optimizer.optimize(queryContext, plan);
+ // when a given uri is null, TableSpaceManager.get will return the default tablespace.
+ space = TableSpaceManager.get(queryContext.get(QueryVars.OUTPUT_TABLE_URI, "")).get();
+ space.rewritePlan(queryContext, plan);
+
for (LogicalPlan.QueryBlock block : plan.getQueryBlocks()) {
LogicalNode[] scanNodes = PlannerUtil.findAllNodes(block.getRoot(), NodeType.SCAN);
if (scanNodes != null) {
@@ -374,10 +355,10 @@ public class QueryMasterTask extends CompositeService {
LOG.error(t.getMessage(), t);
initError = t;
- if (plan != null && sm != null) {
+ if (plan != null && space != null) {
LogicalRootNode rootNode = plan.getRootBlock().getRoot();
try {
- sm.rollbackOutputCommit(rootNode.getChild());
+ space.rollbackTable(rootNode.getChild());
} catch (IOException e) {
LOG.warn(query.getId() + ", failed processing cleanup storage when query failed:" + e.getMessage(), e);
}
@@ -422,16 +403,27 @@ public class QueryMasterTask extends CompositeService {
// Create Output Directory
////////////////////////////////////////////
- String outputPath = context.get(QueryVars.OUTPUT_TABLE_PATH, "");
- if (context.isCreateTable() || context.isInsert()) {
- if (outputPath == null || outputPath.isEmpty()) {
- // hbase
- stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+ String outputPath = context.get(QueryVars.OUTPUT_TABLE_URI, "");
+
+ // The fact that there is no output means that this query is neither CTAS or INSERT (OVERWRITE) INTO
+ // So, this query results won't be materialized as a part of a table.
+ // The result will be temporarily written in the staging directory.
+ if (outputPath.isEmpty()) {
+ // for temporarily written in the storage directory
+ stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
+ } else {
+ Optional<Tablespace> spaceResult = TableSpaceManager.get(outputPath);
+ if (!spaceResult.isPresent()) {
+ throw new IOException("No registered Tablespace for " + outputPath);
+ }
+
+ Tablespace space = spaceResult.get();
+ if (space.getProperty().isMovable()) { // checking if this tablespace allows MOVE operation
+ // If this space allows move operation, the staging directory will be underneath the final output table uri.
+ stagingDir = StorageUtil.concatPath(context.getOutputTableUri().toString(), TMP_STAGING_DIR_PREFIX, queryId);
} else {
- stagingDir = StorageUtil.concatPath(context.getOutputPath(), TMP_STAGING_DIR_PREFIX, queryId);
+ stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
}
- } else {
- stagingDir = new Path(TajoConf.getDefaultRootStagingDir(conf), queryId);
}
// initializ
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 2c3e9e2..5b8f24a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -36,7 +36,6 @@ import org.apache.tajo.engine.planner.UniformRangePartition;
import org.apache.tajo.engine.planner.enforce.Enforcer;
import org.apache.tajo.engine.planner.global.DataChannel;
import org.apache.tajo.engine.planner.global.ExecutionBlock;
-import org.apache.tajo.engine.planner.global.GlobalPlanner;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.planner.global.rewriter.rules.GlobalPlanRewriteUtil;
import org.apache.tajo.engine.utils.TupleUtil;
@@ -83,19 +82,14 @@ public class Repartitioner {
QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext();
ScanNode[] scans = execBlock.getScanNodes();
-
- Path tablePath;
Fragment[] fragments = new Fragment[scans.length];
long[] stats = new long[scans.length];
// initialize variables from the child operators
for (int i = 0; i < scans.length; i++) {
TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName());
- if (tableDesc == null) { // if it is a real table stored on storage
- FileTablespace storageManager =
- (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
- tablePath = storageManager.getTablePath(scans[i].getTableName());
+ if (tableDesc == null) { // if it is a real table stored on storage
if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) {
for (Map.Entry<ExecutionBlockId, ExecutionBlockId> unionScanEntry: execBlock.getUnionScanMap().entrySet()) {
ExecutionBlockId originScanEbId = unionScanEntry.getKey();
@@ -105,25 +99,29 @@ public class Repartitioner {
ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName());
stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes();
}
- fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+
+ // TODO - We should remove dummy flagment usages
+ fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path("/dummy"), 0, 0,
+ new String[]{UNKNOWN_HOST});
+
} else {
+
try {
stats[i] = GlobalPlanRewriteUtil.computeDescendentVolume(scans[i]);
} catch (PlanningException e) {
throw new IOException(e);
}
- Tablespace tablespace =
- TableSpaceManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType());
-
// if table has no data, tablespace will return empty FileFragment.
// So, we need to handle FileFragment by its size.
// If we don't check its size, it can cause IndexOutOfBoundsException.
- List<Fragment> fileFragments = tablespace.getSplits(scans[i].getCanonicalName(), tableDesc);
+ Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+ List<Fragment> fileFragments = space.getSplits(scans[i].getCanonicalName(), tableDesc);
if (fileFragments.size() > 0) {
fragments[i] = fileFragments.get(0);
} else {
- fragments[i] = new FileFragment(scans[i].getCanonicalName(), new Path(tableDesc.getPath()), 0, 0, new String[]{UNKNOWN_HOST});
+ fragments[i] = new FileFragment(scans[i].getCanonicalName(),
+ new Path(tableDesc.getUri()), 0, 0, new String[]{UNKNOWN_HOST});
}
}
}
@@ -377,26 +375,29 @@ public class Repartitioner {
if (broadcastFragments != null) {
//In this phase a ScanNode has a single fragment.
//If there are more than one data files, that files should be added to fragments or partition path
+
for (ScanNode eachScan: broadcastScans) {
+
Path[] partitionScanPaths = null;
TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName());
+ Tablespace space = TableSpaceManager.get(tableDesc.getUri()).get();
+
if (eachScan.getType() == NodeType.PARTITIONS_SCAN) {
- FileTablespace storageManager =
- (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan;
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
- getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc);
+ getFragmentsFromPartitionedTable((FileTablespace) space, eachScan, tableDesc);
partitionScan.setInputPaths(partitionScanPaths);
+
} else {
- Tablespace tablespace = TableSpaceManager.getStorageManager(stage.getContext().getConf(),
- tableDesc.getMeta().getStoreType());
- Collection<Fragment> scanFragments = tablespace.getSplits(eachScan.getCanonicalName(),
+
+ Collection<Fragment> scanFragments = space.getSplits(eachScan.getCanonicalName(),
tableDesc, eachScan);
if (scanFragments != null) {
rightFragments.addAll(scanFragments);
}
+
}
}
}
@@ -505,18 +506,16 @@ public class Repartitioner {
Collection<Fragment> scanFragments;
Path[] partitionScanPaths = null;
+
+ FileTablespace space = (FileTablespace) TableSpaceManager.get(desc.getUri()).get();
+
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)scan;
partitionScanPaths = partitionScan.getInputPaths();
// set null to inputPaths in getFragmentsFromPartitionedTable()
- FileTablespace storageManager =
- (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
- scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc);
+ scanFragments = getFragmentsFromPartitionedTable(space, scan, desc);
} else {
- Tablespace tablespace =
- TableSpaceManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType());
-
- scanFragments = tablespace.getSplits(scan.getCanonicalName(), desc, scan);
+ scanFragments = space.getSplits(scan.getCanonicalName(), desc, scan);
}
if (scanFragments != null) {
@@ -618,9 +617,6 @@ public class Repartitioner {
throws IOException {
ExecutionBlock execBlock = stage.getBlock();
ScanNode scan = execBlock.getScanNodes()[0];
- Path tablePath;
- tablePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()))
- .getTablePath(scan.getTableName());
ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0);
SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT);
@@ -648,10 +644,15 @@ public class Repartitioner {
throw new IOException("Can't get table meta data from catalog: " +
PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan()));
}
- ranges = TableSpaceManager.getStorageManager(stage.getContext().getConf(), storeType)
- .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc,
- sortNode.getInSchema(), sortSpecs,
- mergedRange);
+
+ Tablespace space = TableSpaceManager.getAnyByScheme(storeType).get();
+ ranges = space.getInsertSortRanges(
+ stage.getContext().getQueryContext(),
+ tableDesc,
+ sortNode.getInSchema(),
+ sortSpecs,
+ mergedRange);
+
determinedTaskNum = ranges.length;
} else {
RangePartitionAlgorithm partitioner = new UniformRangePartition(mergedRange, sortSpecs);
@@ -688,7 +689,9 @@ public class Repartitioner {
}
}
- FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+ // TODO - We should remove dummy fragment.
+ FileFragment dummyFragment = new FileFragment(scan.getTableName(), new Path("/dummy"), 0, 0,
+ new String[]{UNKNOWN_HOST});
Stage.scheduleFragment(stage, dummyFragment);
List<FetchImpl> fetches = new ArrayList<FetchImpl>();
@@ -784,11 +787,9 @@ public class Repartitioner {
int maxNum) throws IOException {
ExecutionBlock execBlock = stage.getBlock();
ScanNode scan = execBlock.getScanNodes()[0];
- Path tablePath;
- tablePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf()))
- .getTablePath(scan.getTableName());
- Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST});
+ // TODO - We should remove dummy fragment usages
+ Fragment frag = new FileFragment(scan.getCanonicalName(), new Path("/dummy"), 0, 0, new String[]{UNKNOWN_HOST});
List<Fragment> fragments = new ArrayList<Fragment>();
fragments.add(frag);
Stage.scheduleFragments(stage, fragments);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
index 5a0fc38..a7d605c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
@@ -60,8 +60,8 @@ import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.querymaster.Task.IntermediateEntry;
import org.apache.tajo.storage.FileTablespace;
-import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.Tablespace;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.KeyValueSet;
@@ -1084,18 +1084,18 @@ public class Stage implements EventHandler<StageEvent> {
Collection<Fragment> fragments;
TableMeta meta = table.getMeta();
+ Tablespace tablespace = TableSpaceManager.get(scan.getTableDesc().getUri()).get();
+
// Depending on scanner node's type, it creates fragments. If scan is for
// a partitioned table, It will creates lots fragments for all partitions.
// Otherwise, it creates at least one fragments for a table, which may
// span a number of blocks or possibly consists of a number of files.
+ //
+ // Also, we can ensure FileTableSpace if the type of ScanNode is PARTITIONS_SCAN.
if (scan.getType() == NodeType.PARTITIONS_SCAN) {
// After calling this method, partition paths are removed from the physical plan.
- FileTablespace storageManager =
- (FileTablespace) TableSpaceManager.getFileStorageManager(stage.getContext().getConf());
- fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table);
+ fragments = Repartitioner.getFragmentsFromPartitionedTable((FileTablespace) tablespace, scan, table);
} else {
- Tablespace tablespace =
- TableSpaceManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType());
fragments = tablespace.getSplits(scan.getCanonicalName(), table, scan);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index f265e50..ae22d0d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -486,7 +486,7 @@ public class QueryExecutorServlet extends HttpServlet {
if (resultRows <= 0) {
resultRows = 1000;
}
- LOG.info("Tajo Query Result: " + desc.getPath() + "\n");
+ LOG.info("Tajo Query Result: " + desc.getUri() + "\n");
int numOfColumns = rsmd.getColumnCount();
for(int i = 0; i < numOfColumns; i++) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
index 0721ef1..0df5d4d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/LegacyTaskImpl.java
@@ -160,7 +160,7 @@ public class LegacyTaskImpl implements Task {
this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
- Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+ Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
.getAppenderFilePath(getId(), queryContext.getStagingDir());
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
index fbd070e..66c8e4a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -40,7 +40,6 @@ import org.apache.tajo.function.FunctionSignature;
import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.rpc.RpcConstants;
import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.service.ServiceTrackerException;
import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.service.TajoMasterInfo;
import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary;
@@ -56,7 +55,7 @@ import org.apache.tajo.rule.EvaluationFailedException;
import org.apache.tajo.rule.SelfDiagnosisRuleEngine;
import org.apache.tajo.rule.SelfDiagnosisRuleSession;
import org.apache.tajo.storage.HashShuffleAppenderManager;
-import org.apache.tajo.storage.TableSpaceManager;
+import org.apache.tajo.storage.OldStorageManager;
import org.apache.tajo.util.*;
import org.apache.tajo.util.history.HistoryReader;
import org.apache.tajo.util.history.HistoryWriter;
@@ -370,7 +369,7 @@ public class TajoWorker extends CompositeService {
}
try {
- TableSpaceManager.shutdown();
+ OldStorageManager.shutdown();
} catch (IOException ie) {
LOG.error(ie.getMessage(), ie);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
index be3960b..5974693 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java
@@ -154,7 +154,7 @@ public class TaskImpl implements Task {
this.sortComp = new BaseTupleComparator(finalSchema, sortNode.getSortKeys());
}
} else {
- Path outFilePath = ((FileTablespace) TableSpaceManager.getFileStorageManager(systemConf))
+ Path outFilePath = ((FileTablespace) TableSpaceManager.get(queryContext.getStagingDir().toUri()).get())
.getAppenderFilePath(getId(), queryContext.getStagingDir());
LOG.info("Output File Path: " + outFilePath);
context.setOutputPath(outFilePath);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
index 43ec5ca..6771912 100644
--- a/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/catalogview.jsp
@@ -30,7 +30,6 @@
<%@ page import="java.util.Collection" %>
<%@ page import="java.util.List" %>
<%@ page import="java.util.Map" %>
-<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%@ page import="java.net.InetSocketAddress" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -186,7 +185,7 @@
<div style='margin-top:10px'>
<div style=''>Detail</div>
<table border="1" class='border_table'>
- <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getPath()%></td></tr>
+ <tr><td width='100'>Table path</td><td width='410'><%=tableDesc.getUri()%></td></tr>
<tr><td>Store type</td><td><%=tableDesc.getMeta().getStoreType()%></td></tr>
<tr><td># rows</td><td><%=(tableDesc.hasStats() ? ("" + tableDesc.getStats().getNumRows()) : "-")%></td></tr>
<tr><td>Volume</td><td><%=(tableDesc.hasStats() ? FileUtil.humanReadableByteCount(tableDesc.getStats().getNumBytes(),true) : "-")%></td></tr>
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/main/resources/webapps/admin/index.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp
index e0cf876..43bb6c1 100644
--- a/tajo-core/src/main/resources/webapps/admin/index.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/index.jsp
@@ -23,20 +23,19 @@
<%@ page import="org.apache.tajo.conf.TajoConf" %>
<%@ page import="org.apache.tajo.ipc.QueryCoordinatorProtocol" %>
<%@ page import="org.apache.tajo.master.TajoMaster" %>
-<%@ page import="org.apache.tajo.service.ServiceTracker" %>
-<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
-<%@ page import="org.apache.tajo.master.QueryInProgress" %>
<%@ page import="org.apache.tajo.master.rm.Worker" %>
<%@ page import="org.apache.tajo.master.rm.WorkerState" %>
+<%@ page import="org.apache.tajo.service.ServiceTracker" %>
+<%@ page import="org.apache.tajo.service.TajoMasterInfo" %>
+<%@ page import="org.apache.tajo.storage.TableSpaceManager" %>
+<%@ page import="org.apache.tajo.storage.Tablespace" %>
<%@ page import="org.apache.tajo.util.NetUtils" %>
<%@ page import="org.apache.tajo.util.TUtil" %>
<%@ page import="org.apache.tajo.webapp.StaticHttpServer" %>
-<%@ page import="java.util.List" %>
-<%@ page import="java.util.Collection" %>
+<%@ page import="java.net.InetSocketAddress" %>
<%@ page import="java.util.Date" %>
+<%@ page import="java.util.List" %>
<%@ page import="java.util.Map" %>
-<%@ page import="java.net.InetSocketAddress" %>
-<%@ page import="org.apache.tajo.service.ServiceTracker" %>
<%
TajoMaster master = (TajoMaster) StaticHttpServer.getInstance().getAttribute("tajo.info.server.object");
@@ -139,7 +138,15 @@
<tr><td width='150'>Threads:</td><td><a href='thread.jsp'>thread dump...</a></tr>
</table>
<hr/>
-
+ <h3>Tablespaces</h3>
+ <table width="100%" class="border_table" border="1">
+ <tr><th>Tablespace Name</th><th>URI</th><th>Handler</th></tr>
+ <% for (Tablespace space : TableSpaceManager.getAllTablespaces()) {
+ if (space.isVisible()) { %>
+ <tr><td><%=space.getName()%></td><td><%=space.getUri()%></td><td><%=space.getClass().getName()%></td></tr>
+ <% }}%>
+ </table>
+ <hr/>
<h3>Cluster Summary</h3>
<table width="100%" class="border_table" border="1">
<tr><th>Type</th><th>Total</th><th>Live</th><th>Dead</th><th>Running Master</th><th>Memory Resource<br/>(used/total)</th><th>Disk Resource<br/>(used/total)</th></tr>
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index b5be9d0..ca2378b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -46,7 +46,7 @@ public class BackendTestingUtil {
public static void writeTmpTable(TajoConf conf, Path tablePath)
throws IOException {
- FileTablespace sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+ FileTablespace sm = TableSpaceManager.getDefault();
Appender appender;
Path filePath = new Path(tablePath, "table.csv");
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index 9a92e90..57b1e18 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -945,7 +945,7 @@ public class QueryTestCaseBase {
return null;
}
- Path path = new Path(tableDesc.getPath());
+ Path path = new Path(tableDesc.getUri());
return getTableFileContents(path);
}
@@ -955,7 +955,7 @@ public class QueryTestCaseBase {
return null;
}
- Path path = new Path(tableDesc.getPath());
+ Path path = new Path(tableDesc.getUri());
FileSystem fs = path.getFileSystem(conf);
return listFiles(fs, path);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9b5980b..acdae85 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.client.TajoClientUtil;
@@ -48,16 +47,18 @@ import org.apache.tajo.querymaster.QueryMasterTask;
import org.apache.tajo.querymaster.Stage;
import org.apache.tajo.querymaster.StageState;
import org.apache.tajo.service.ServiceTrackerFactory;
+import org.apache.tajo.storage.FileTablespace;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.util.NetUtils;
+import org.apache.tajo.util.Pair;
import org.apache.tajo.worker.TajoWorker;
import java.io.File;
import java.io.IOException;
import java.io.Writer;
import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
@@ -345,10 +346,18 @@ public class TajoTestingCluster {
LOG.info("derby repository is set to "+conf.get(CatalogConstants.CATALOG_URI));
if (!local) {
- c.setVar(ConfVars.ROOT_DIR,
- getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
+ c.setVar(ConfVars.ROOT_DIR, getMiniDFSCluster().getFileSystem().getUri() + "/tajo");
} else {
- c.setVar(ConfVars.ROOT_DIR, testBuildDir.getAbsolutePath() + "/tajo");
+ c.setVar(ConfVars.ROOT_DIR, "file://" + testBuildDir.getAbsolutePath() + "/tajo");
+ }
+
+ // Do not need for local file system
+ if (!local) {
+ FileTablespace defaultTableSpace =
+ new FileTablespace(TableSpaceManager.DEFAULT_TABLESPACE_NAME, TajoConf.getWarehouseDir(c).toUri());
+ defaultTableSpace.init(conf);
+
+ TableSpaceManager.addTableSpaceForTest(defaultTableSpace);
}
setupCatalogForTesting(c, testBuildDir);
@@ -441,13 +450,6 @@ public class TajoTestingCluster {
}
}
- public void restartTajoCluster(int numSlaves) throws Exception {
- tajoMaster.stop();
- tajoMaster.start();
-
- LOG.info("Minicluster has been restarted");
- }
-
public TajoMaster getMaster() {
return this.tajoMaster;
}
@@ -653,7 +655,14 @@ public class TajoTestingCluster {
if (!fs.exists(rootDir)) {
fs.mkdirs(rootDir);
}
- Path tablePath = new Path(rootDir, tableName);
+ Path tablePath;
+ if (CatalogUtil.isFQTableName(tableName)) {
+ Pair<String, String> name = CatalogUtil.separateQualifierAndName(tableName);
+ tablePath = new Path(rootDir, new Path(name.getFirst(), name.getSecond()));
+ } else {
+ tablePath = new Path(rootDir, tableName);
+ }
+
fs.mkdirs(tablePath);
if (tableDatas.length > 0) {
int recordPerFile = tableDatas.length / numDataFiles;
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
index 54e50fc..ce951c6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
+++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.FileUtil;
import org.junit.After;
import org.junit.Before;
@@ -214,10 +215,9 @@ public class TestTajoCli {
String consoleResult = new String(out.toByteArray());
- FileSystem fs = FileSystem.get(testBase.getTestingCluster().getConfiguration());
if (!cluster.isHiveCatalogStoreRunning()) {
assertOutputResult(resultFileName, consoleResult, new String[]{"${table.path}"},
- new String[]{fs.getUri() + "/tajo/warehouse/default/" + tableName});
+ new String[]{TableSpaceManager.getDefault().getTableUri("default", tableName).toString()});
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 765a084..73b97fa 100644
--- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -324,7 +324,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -345,7 +345,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -422,7 +422,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -446,7 +446,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -470,7 +470,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -495,7 +495,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -521,7 +521,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -574,7 +574,7 @@ public class TestTajoClient {
client.updateQuery(sql);
assertTrue(client.existTable(tableName));
- Path tablePath = new Path(client.getTableDesc(tableName).getPath());
+ Path tablePath = new Path(client.getTableDesc(tableName).getUri());
FileSystem hdfs = tablePath.getFileSystem(conf);
assertTrue(hdfs.exists(tablePath));
@@ -704,7 +704,7 @@ public class TestTajoClient {
assertEquals(resultDesc.getMeta().getOption(StorageConstants.TEXT_NULL), "\\\\T");
- Path path = new Path(resultDesc.getPath());
+ Path path = new Path(resultDesc.getUri());
FileSystem fs = path.getFileSystem(tajoConf);
FileStatus[] files = fs.listStatus(path);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
index 66aedc0..328f883 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/ExprTestBase.java
@@ -50,6 +50,7 @@ import org.apache.tajo.plan.verifier.LogicalPlanVerifier;
import org.apache.tajo.plan.verifier.PreLogicalPlanVerifier;
import org.apache.tajo.plan.verifier.VerificationState;
import org.apache.tajo.storage.LazyTuple;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.util.BytesUtils;
@@ -103,7 +104,7 @@ public class ExprTestBase {
analyzer = new SQLAnalyzer();
preLogicalPlanVerifier = new PreLogicalPlanVerifier(cat);
- planner = new LogicalPlanner(cat);
+ planner = new LogicalPlanner(cat, TableSpaceManager.getInstance());
optimizer = new LogicalOptimizer(util.getConfiguration());
annotatedPlanVerifier = new LogicalPlanVerifier(util.getConfiguration(), cat);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
index 4bfe640..80f3459 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/eval/TestEvalTreeUtil.java
@@ -45,6 +45,7 @@ import org.apache.tajo.plan.function.GeneralFunction;
import org.apache.tajo.plan.logical.GroupbyNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.nameresolver.NameResolvingMode;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.util.CommonTestingUtil;
import org.junit.AfterClass;
@@ -116,7 +117,7 @@ public class TestEvalTreeUtil {
catalog.createFunction(funcMeta);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
String[] QUERIES = {
"select name, score, age from people where score > 30", // 0
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
index a408fd6..9aa7ddf 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalOptimizer.java
@@ -24,7 +24,6 @@ import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.engine.function.FunctionLoader;
import org.apache.tajo.engine.function.builtin.SumInt;
@@ -35,6 +34,7 @@ import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.KeyValueSet;
import org.junit.AfterClass;
@@ -103,7 +103,7 @@ public class TestLogicalOptimizer {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
optimizer = new LogicalOptimizer(util.getConfiguration());
defaultContext = LocalTajoTestingUtility.createDummyContext(util.getConfiguration());
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
index cee1593..3cee816 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlan.java
@@ -19,6 +19,7 @@
package org.apache.tajo.engine.planner;
import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.graph.SimpleDirectedGraph;
import org.apache.tajo.plan.LogicalPlan;
import org.apache.tajo.plan.LogicalPlanner;
@@ -39,7 +40,7 @@ public class TestLogicalPlan {
public static void setup() throws Exception {
util = new TajoTestingCluster();
util.startCatalogCluster();
- planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog());
+ planner = new LogicalPlanner(util.getMiniCatalogCluster().getCatalog(), TableSpaceManager.getInstance());
}
public static void tearDown() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
index 1feea4c..351a6af 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.tajo.LocalTajoTestingUtility;
+import org.apache.tajo.QueryVars;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.algebra.Expr;
@@ -29,7 +30,6 @@ import org.apache.tajo.algebra.JoinType;
import org.apache.tajo.benchmark.TPCH;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
import org.apache.tajo.datum.TextDatum;
import org.apache.tajo.engine.function.FunctionLoader;
@@ -42,6 +42,7 @@ import org.apache.tajo.plan.*;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
@@ -130,7 +131,7 @@ public class TestLogicalPlanner {
catalog.createFunction(funcDesc);
sqlAnalyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
}
@AfterClass
@@ -155,6 +156,13 @@ public class TestLogicalPlanner {
"select length(name), length(deptname), *, empid+10 from employee where empId > 500", // 13
};
+ private static QueryContext createQueryContext() {
+ QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ qc.put(QueryVars.DEFAULT_SPACE_URI, "file:/");
+ qc.put(QueryVars.DEFAULT_SPACE_ROOT_URI, "file:/");
+ return qc;
+ }
+
public static final void testCloneLogicalNode(LogicalNode n1) throws CloneNotSupportedException {
LogicalNode copy = (LogicalNode) n1.clone();
assertTrue(n1.deepEquals(copy));
@@ -162,7 +170,7 @@ public class TestLogicalPlanner {
@Test
public final void testSingleRelation() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[0]);
LogicalPlan planNode = planner.createPlan(qc, expr);
@@ -196,7 +204,7 @@ public class TestLogicalPlanner {
@Test
public final void testImplicityJoinPlan() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
// two relations
Expr expr = sqlAnalyzer.parse(QUERIES[1]);
@@ -285,7 +293,7 @@ public class TestLogicalPlanner {
@Test
public final void testNaturalJoinPlan() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
// two relations
Expr context = sqlAnalyzer.parse(JOINS[0]);
LogicalNode plan = planner.createPlan(qc, context).getRootBlock().getRoot();
@@ -317,7 +325,7 @@ public class TestLogicalPlanner {
@Test
public final void testInnerJoinPlan() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
// two relations
Expr expr = sqlAnalyzer.parse(JOINS[1]);
LogicalPlan plan = planner.createPlan(qc, expr);
@@ -350,7 +358,7 @@ public class TestLogicalPlanner {
@Test
public final void testOuterJoinPlan() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
// two relations
Expr expr = sqlAnalyzer.parse(JOINS[2]);
@@ -385,7 +393,7 @@ public class TestLogicalPlanner {
@Test
public final void testGroupby() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
// without 'having clause'
Expr context = sqlAnalyzer.parse(QUERIES[7]);
@@ -429,7 +437,7 @@ public class TestLogicalPlanner {
public final void testMultipleJoin() throws IOException, PlanningException {
Expr expr = sqlAnalyzer.parse(
FileUtil.readTextFile(new File("src/test/resources/queries/TestJoinQuery/testTPCHQ2Join.sql")));
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
testJsonSerDerObject(plan);
Schema expected = tpch.getOutSchema("q2");
@@ -488,7 +496,7 @@ public class TestLogicalPlanner {
Expr expr = sqlAnalyzer.parse(
FileUtil.readTextFile(new File
("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual1.sql")));
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
LogicalPlan plan = planner.createPlan(qc, expr);
LogicalNode node = plan.getRootBlock().getRoot();
@@ -518,7 +526,7 @@ public class TestLogicalPlanner {
}
for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ if (!entry.getValue()) {
Preconditions.checkArgument(false,
"JoinQual not found. -> required JoinQual:" + entry.getKey().toJson());
}
@@ -530,7 +538,7 @@ public class TestLogicalPlanner {
Expr expr = sqlAnalyzer.parse(
FileUtil.readTextFile(new File
("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual2.sql")));
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
LogicalPlan plan = planner.createPlan(qc,expr);
LogicalNode node = plan.getRootBlock().getRoot();
@@ -559,7 +567,7 @@ public class TestLogicalPlanner {
}
for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ if (!entry.getValue()) {
Preconditions.checkArgument(false,
"SelectionQual not found. -> required JoinQual:" + entry.getKey().toJson());
}
@@ -571,7 +579,7 @@ public class TestLogicalPlanner {
Expr expr = sqlAnalyzer.parse(
FileUtil.readTextFile(new File
("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual3.sql")));
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
LogicalPlan plan = planner.createPlan(qc, expr);
LogicalNode node = plan.getRootBlock().getRoot();
@@ -605,7 +613,7 @@ public class TestLogicalPlanner {
}
for (Map.Entry<BinaryEval, Boolean> entry : qualMap.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ if (!entry.getValue()) {
Preconditions.checkArgument(false,
"ScanQual not found. -> required JoinQual:" + entry.getKey().toJson());
}
@@ -618,7 +626,7 @@ public class TestLogicalPlanner {
Expr expr = sqlAnalyzer.parse(
FileUtil.readTextFile(new File
("src/test/resources/queries/TestJoinQuery/testJoinWithMultipleJoinQual4.sql")));
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
LogicalPlan plan = planner.createPlan(qc, expr);
LogicalNode node = plan.getRootBlock().getRoot();
@@ -675,14 +683,14 @@ public class TestLogicalPlanner {
for (Map.Entry<BinaryEval, Boolean> entry : joinQualMap.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ if (!entry.getValue()) {
Preconditions.checkArgument(false,
"JoinQual not found. -> required JoinQual:" + entry.getKey().toJson());
}
}
for (Map.Entry<BinaryEval, Boolean> entry : scanMap.entrySet()) {
- if (!entry.getValue().booleanValue()) {
+ if (!entry.getValue()) {
Preconditions.checkArgument(false,
"ScanQual not found. -> required JoinQual:" + entry.getKey().toJson());
}
@@ -709,7 +717,7 @@ public class TestLogicalPlanner {
@Test
public final void testStoreTable() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr context = sqlAnalyzer.parse(QUERIES[8]);
@@ -727,7 +735,7 @@ public class TestLogicalPlanner {
@Test
public final void testOrderBy() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[4]);
@@ -757,7 +765,7 @@ public class TestLogicalPlanner {
@Test
public final void testLimit() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[12]);
@@ -779,7 +787,7 @@ public class TestLogicalPlanner {
@Test
public final void testSPJPush() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[5]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -801,7 +809,7 @@ public class TestLogicalPlanner {
@Test
public final void testSPJ() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[6]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -811,7 +819,7 @@ public class TestLogicalPlanner {
@Test
public final void testJson() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[9]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -833,7 +841,7 @@ public class TestLogicalPlanner {
@Test
public final void testVisitor() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
// two relations
Expr expr = sqlAnalyzer.parse(QUERIES[1]);
@@ -860,7 +868,7 @@ public class TestLogicalPlanner {
@Test
public final void testExprNode() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[10]);
LogicalPlan rootNode = planner.createPlan(qc, expr);
@@ -882,7 +890,7 @@ public class TestLogicalPlanner {
@Test
public final void testAsterisk() throws CloneNotSupportedException, PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(QUERIES[13]);
LogicalPlan planNode = planner.createPlan(qc, expr);
@@ -912,7 +920,7 @@ public class TestLogicalPlanner {
@Test
public final void testAlias1() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(ALIAS[0]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -940,7 +948,7 @@ public class TestLogicalPlanner {
@Test
public final void testAlias2() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(ALIAS[1]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -961,7 +969,7 @@ public class TestLogicalPlanner {
@Test
public final void testCreateTableDef() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(CREATE_TABLE[0]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -980,7 +988,7 @@ public class TestLogicalPlanner {
assertEquals("score", def.getColumn(3).getSimpleName());
assertEquals(Type.FLOAT4, def.getColumn(3).getDataType().getType());
assertTrue("CSV".equalsIgnoreCase(createTable.getStorageType()));
- assertEquals("/tmp/data", createTable.getPath().toString());
+ assertEquals("/tmp/data", createTable.getUri().toString());
assertTrue(createTable.hasOptions());
assertEquals("|", createTable.getOptions().get("csv.delimiter"));
}
@@ -1047,7 +1055,7 @@ public class TestLogicalPlanner {
@Test
public final void testSetPlan() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(setStatements[0]);
LogicalNode plan = planner.createPlan(qc, expr).getRootBlock().getRoot();
@@ -1068,7 +1076,7 @@ public class TestLogicalPlanner {
@Test
public void testSetQualifier() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr context = sqlAnalyzer.parse(setQualifiers[0]);
LogicalNode plan = planner.createPlan(qc, context).getRootBlock().getRoot();
@@ -1121,7 +1129,7 @@ public class TestLogicalPlanner {
@Test
public final void testInsertInto0() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[0]);
LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1134,7 +1142,7 @@ public class TestLogicalPlanner {
@Test
public final void testInsertInto1() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[1]);
LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1146,7 +1154,7 @@ public class TestLogicalPlanner {
@Test
public final void testInsertInto2() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[2]);
LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1161,19 +1169,19 @@ public class TestLogicalPlanner {
@Test
public final void testInsertInto3() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[3]);
LogicalPlan plan = planner.createPlan(qc, expr);
assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertFalse(insertNode.isOverwrite());
- assertTrue(insertNode.hasPath());
+ assertTrue(insertNode.hasUri());
}
@Test
public final void testInsertInto4() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[4]);
LogicalPlan plan = planner.createPlan(qc, expr);
@@ -1189,19 +1197,19 @@ public class TestLogicalPlanner {
@Test
public final void testInsertInto5() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[5]);
LogicalPlan plan = planner.createPlan(qc, expr);
assertEquals(1, plan.getQueryBlocks().size());
InsertNode insertNode = getInsertNode(plan);
assertTrue(insertNode.isOverwrite());
- assertTrue(insertNode.hasPath());
+ assertTrue(insertNode.hasUri());
}
@Test
public final void testInsertInto6() throws PlanningException {
- QueryContext qc = new QueryContext(util.getConfiguration(), session);
+ QueryContext qc = createQueryContext();
Expr expr = sqlAnalyzer.parse(insertStatements[6]);
LogicalPlan plan = planner.createPlan(qc, expr);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index 0082800..d62eed2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -39,6 +39,7 @@ import org.apache.tajo.plan.PlanningException;
import org.apache.tajo.plan.expr.*;
import org.apache.tajo.plan.logical.*;
import org.apache.tajo.plan.util.PlannerUtil;
+import org.apache.tajo.storage.TableSpaceManager;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
@@ -108,7 +109,7 @@ public class TestPlannerUtil {
catalog.createFunction(funcDesc);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
}
@AfterClass
@@ -337,7 +338,7 @@ public class TestPlannerUtil {
TableDesc tableDesc = new TableDesc();
tableDesc.setName("Test");
- tableDesc.setPath(path.toUri());
+ tableDesc.setUri(path.toUri());
FileSystem fs = path.getFileSystem(util.getConfiguration());
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index e7e4f7d..2464fb1 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -86,7 +86,7 @@ public class TestBNLJoinExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.init();
VTuple tuple = new VTuple(schema.size());
@@ -108,8 +108,7 @@ public class TestBNLJoinExec {
peopleSchema.addColumn("age", Type.INT4);
TableMeta peopleMeta = CatalogUtil.newTableMeta("CSV");
Path peoplePath = new Path(testDir, "people.csv");
- appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
- .getAppender(peopleMeta, peopleSchema, peoplePath);
+ appender = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(peopleMeta, peopleSchema, peoplePath);
appender.init();
tuple = new VTuple(peopleSchema.size());
for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {
@@ -125,7 +124,7 @@ public class TestBNLJoinExec {
people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath);
catalog.createTable(people);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
}
@After
@@ -150,10 +149,10 @@ public class TestBNLJoinExec {
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN);
FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(),
- new Path(employee.getPath()),
+ new Path(employee.getUri()),
Integer.MAX_VALUE);
FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(),
- new Path(people.getPath()),
+ new Path(people.getUri()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testBNLCrossJoin");
@@ -183,9 +182,9 @@ public class TestBNLJoinExec {
context).getRootBlock().getRoot();
FileFragment[] empFrags = FileTablespace.splitNG(conf, "default.e", employee.getMeta(),
- new Path(employee.getPath()), Integer.MAX_VALUE);
+ new Path(employee.getUri()), Integer.MAX_VALUE);
FileFragment[] peopleFrags = FileTablespace.splitNG(conf, "default.p", people.getMeta(),
- new Path(people.getPath()), Integer.MAX_VALUE);
+ new Path(people.getUri()), Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index 5a7ba6a..96a1f36 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -90,7 +90,7 @@ public class TestBSTIndexExec {
Path workDir = CommonTestingUtil.getTestDir();
catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
- sm = (FileTablespace) TableSpaceManager.getFileStorageManager(conf);
+ sm = TableSpaceManager.getLocalFs();
idxPath = new Path(workDir, "test.idx");
@@ -144,11 +144,11 @@ public class TestBSTIndexExec {
TableDesc desc = new TableDesc(
CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta,
- sm.getTablePath("employee").toUri());
+ sm.getTableUri(TajoConstants.DEFAULT_DATABASE_NAME, "employee"));
catalog.createTable(desc);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
optimizer = new LogicalOptimizer(conf);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index 8e2f234..d94d3f6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -82,7 +82,7 @@ public class TestExternalSortExec {
TableMeta employeeMeta = CatalogUtil.newTableMeta("CSV");
Path employeePath = new Path(testDir, "employee.csv");
- Appender appender = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender = ((FileTablespace) TableSpaceManager.getLocalFs())
.getAppender(employeeMeta, schema, employeePath);
appender.enableStats();
appender.init();
@@ -104,7 +104,7 @@ public class TestExternalSortExec {
employee = new TableDesc("default.employee", schema, employeeMeta, employeePath.toUri());
catalog.createTable(employee);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
}
@After
@@ -120,7 +120,7 @@ public class TestExternalSortExec {
@Test
public final void testNext() throws IOException, PlanningException {
FileFragment[] frags = FileTablespace.splitNG(conf, "default.employee", employee.getMeta(),
- new Path(employee.getPath()), Integer.MAX_VALUE);
+ new Path(employee.getUri()), Integer.MAX_VALUE);
Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
LocalTajoTestingUtility.newTaskAttemptId(), new FileFragment[] { frags[0] }, workDir);
http://git-wip-us.apache.org/repos/asf/tajo/blob/d0f37012/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index fb6fd02..21a101a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -104,8 +104,7 @@ public class TestFullOuterHashJoinExec {
TableMeta dep3Meta = CatalogUtil.newTableMeta("CSV");
Path dep3Path = new Path(testDir, "dep3.csv");
- Appender appender1 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
- .getAppender(dep3Meta, dep3Schema, dep3Path);
+ Appender appender1 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(dep3Meta, dep3Schema, dep3Path);
appender1.init();
VTuple tuple = new VTuple(dep3Schema.size());
for (int i = 0; i < 10; i++) {
@@ -134,8 +133,7 @@ public class TestFullOuterHashJoinExec {
TableMeta job3Meta = CatalogUtil.newTableMeta("CSV");
Path job3Path = new Path(testDir, "job3.csv");
- Appender appender2 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
- .getAppender(job3Meta, job3Schema, job3Path);
+ Appender appender2 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(job3Meta, job3Schema, job3Path);
appender2.init();
VTuple tuple2 = new VTuple(job3Schema.size());
for (int i = 1; i < 4; i++) {
@@ -174,8 +172,7 @@ public class TestFullOuterHashJoinExec {
TableMeta emp3Meta = CatalogUtil.newTableMeta("CSV");
Path emp3Path = new Path(testDir, "emp3.csv");
- Appender appender3 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
- .getAppender(emp3Meta, emp3Schema, emp3Path);
+ Appender appender3 = ((FileTablespace) TableSpaceManager.getLocalFs()).getAppender(emp3Meta, emp3Schema, emp3Path);
appender3.init();
VTuple tuple3 = new VTuple(emp3Schema.size());
@@ -227,7 +224,7 @@ public class TestFullOuterHashJoinExec {
TableMeta phone3Meta = CatalogUtil.newTableMeta("CSV");
Path phone3Path = new Path(testDir, "phone3.csv");
- Appender appender5 = ((FileTablespace) TableSpaceManager.getFileStorageManager(conf))
+ Appender appender5 = ((FileTablespace) TableSpaceManager.getLocalFs())
.getAppender(phone3Meta, phone3Schema, phone3Path);
appender5.init();
@@ -237,7 +234,7 @@ public class TestFullOuterHashJoinExec {
catalog.createTable(phone3);
analyzer = new SQLAnalyzer();
- planner = new LogicalPlanner(catalog);
+ planner = new LogicalPlanner(catalog, TableSpaceManager.getInstance());
defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
}
@@ -266,9 +263,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+ FileFragment[] dep3Frags = FileTablespace.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getUri()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags);
@@ -305,9 +302,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()),
Integer.MAX_VALUE);
- FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -343,9 +340,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
Integer.MAX_VALUE);
- FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+ FileFragment[] job3Frags = FileTablespace.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getUri()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -382,9 +379,9 @@ public class TestFullOuterHashJoinExec {
Enforcer enforcer = new Enforcer();
enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN);
- FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+ FileFragment[] emp3Frags = FileTablespace.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getUri()),
Integer.MAX_VALUE);
- FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+ FileFragment[] phone3Frags = FileTablespace.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getUri()),
Integer.MAX_VALUE);
FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);