You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/09 04:23:37 UTC
[2/3] TAJO-149: Eliminate QueryConf and its file write. (hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
index d1e92be..c4b6ae0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -18,28 +18,29 @@
package org.apache.tajo.master.querymaster;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.state.*;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.QueryState;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableDescImpl;
-import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
-import org.apache.tajo.engine.planner.logical.NodeType;
-import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.master.ExecutionBlock;
import org.apache.tajo.master.ExecutionBlockCursor;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.event.*;
import org.apache.tajo.storage.StorageManager;
+import org.apache.tajo.util.TUtil;
import java.io.IOException;
import java.util.ArrayList;
@@ -54,14 +55,14 @@ public class Query implements EventHandler<QueryEvent> {
private static final Log LOG = LogFactory.getLog(Query.class);
// Facilities for Query
- private final QueryConf conf;
+ private final TajoConf systemConf;
private final Clock clock;
private String queryStr;
private Map<ExecutionBlockId, SubQuery> subqueries;
private final EventHandler eventHandler;
private final MasterPlan plan;
private final StorageManager sm;
- QueryMasterTask.QueryContext context;
+ QueryMasterTask.QueryMasterTaskContext context;
private ExecutionBlockCursor cursor;
// Query Status
@@ -108,13 +109,13 @@ public class Query implements EventHandler<QueryEvent> {
.installTopology();
- public Query(final QueryMasterTask.QueryContext context, final QueryId id,
+ public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId id,
final long appSubmitTime,
final String queryStr,
final EventHandler eventHandler,
final MasterPlan plan) {
this.context = context;
- this.conf = context.getConf();
+ this.systemConf = context.getConf();
this.id = id;
this.clock = context.getClock();
this.appSubmitTime = appSubmitTime;
@@ -308,31 +309,21 @@ public class Query implements EventHandler<QueryEvent> {
} else { // Finish a query
if (query.checkQueryForCompleted() == QueryState.QUERY_SUCCEEDED) {
- SubQuery subQuery = query.getSubQuery(castEvent.getExecutionBlockId());
- TableDesc outputTableDesc = new TableDescImpl(query.context.getQueryMeta().getOutputTable(),
- subQuery.getTableMeta(), query.context.getQueryMeta().getOutputPath());
- query.setResultDesc(outputTableDesc);
-
- if (!query.context.getQueryMeta().isFileOutput()) {
- try {
- query.writeStat(query.context.getQueryMeta().getOutputPath(), subQuery);
- } catch (IOException e) {
- e.printStackTrace();
+
+ Path finalOutputDir = commitOutputData(query);
+ TableDesc finalTableDesc = buildOrUpdateResultTableDesc(query, castEvent.getExecutionBlockId(), finalOutputDir);
+
+ QueryContext queryContext = query.context.getQueryContext();
+ CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
+
+ if (queryContext.hasOutputTable()) { // TRUE only if a query command is 'CREATE TABLE' OR 'INSERT INTO'
+ if (queryContext.isOutputOverwrite()) { // TRUE only if a query is 'INSERT OVERWRITE INTO'
+ catalog.deleteTable(finalOutputDir.getName());
}
+ catalog.addTable(finalTableDesc);
}
+ query.setResultDesc(finalTableDesc);
query.eventHandler.handle(new QueryFinishEvent(query.getId()));
-
- StoreTableNode storeTableNode = (StoreTableNode) PlannerUtil.findTopNode(subQuery.getBlock().getPlan(),
- NodeType.STORE);
- if (storeTableNode.isCreatedTable()) {
- query.context.getQueryMasterContext().getWorkerContext().getCatalog().addTable(outputTableDesc);
- } else if (storeTableNode.isOverwrite() && !query.context.getQueryMeta().isFileOutput()) {
- CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
- TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
- updatingTable.getMeta().setStat(outputTableDesc.getMeta().getStat());
- catalog.deleteTable(outputTableDesc.getName());
- catalog.addTable(updatingTable);
- }
}
return query.finished(QueryState.QUERY_SUCCEEDED);
@@ -342,14 +333,58 @@ public class Query implements EventHandler<QueryEvent> {
return QueryState.QUERY_FAILED;
}
}
- }
- private static class DiagnosticsUpdateTransition implements
- SingleArcTransition<Query, QueryEvent> {
- @Override
- public void transition(Query query, QueryEvent event) {
- query.addDiagnostic(((QueryDiagnosticsUpdateEvent) event)
- .getDiagnosticUpdate());
+ /**
+ * It moves a result data stored in a staging output dir into a final output dir.
+ */
+ public Path commitOutputData(Query query) {
+ QueryContext queryContext = query.context.getQueryContext();
+ Path stagingResultDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+ Path finalOutputDir;
+ if (queryContext.hasOutputPath()) {
+ finalOutputDir = queryContext.getOutputPath();
+ try {
+ FileSystem fs = stagingResultDir.getFileSystem(query.systemConf);
+ fs.rename(stagingResultDir, finalOutputDir);
+ LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ } else {
+ finalOutputDir = new Path(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME);
+ }
+
+ return finalOutputDir;
+ }
+
+ /**
+ * It builds a table desc and update the table desc if necessary.
+ */
+ public TableDesc buildOrUpdateResultTableDesc(Query query, ExecutionBlockId finalExecBlockId, Path finalOutputDir) {
+ // Determine the output table name
+ SubQuery subQuery = query.getSubQuery(finalExecBlockId);
+ QueryContext queryContext = query.context.getQueryContext();
+ String outputTableName;
+ if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
+ outputTableName = queryContext.getOutputTable();
+ } else { // SELECT STATEMENT
+ outputTableName = query.getId().toString();
+ }
+
+ TableDesc outputTableDesc = new TableDescImpl(outputTableName, subQuery.getTableMeta(), finalOutputDir);
+ TableDesc finalTableDesc = outputTableDesc;
+
+ // If a query has a target table, a TableDesc is updated.
+ if (queryContext.hasOutputTable()) { // CREATE TABLE or INSERT STATEMENT
+ if (queryContext.isOutputOverwrite()) {
+ CatalogService catalog = query.context.getQueryMasterContext().getWorkerContext().getCatalog();
+ Preconditions.checkNotNull(catalog, "CatalogService is NULL");
+ TableDesc updatingTable = catalog.getTableDesc(outputTableDesc.getName());
+ updatingTable.getMeta().setStat(outputTableDesc.getMeta().getStat());
+ finalTableDesc = updatingTable;
+ }
+ }
+ return finalTableDesc;
}
}
@@ -415,9 +450,32 @@ public class Query implements EventHandler<QueryEvent> {
}
}
- private void writeStat(Path outputPath, SubQuery subQuery)
- throws IOException {
- ExecutionBlock execBlock = subQuery.getBlock();
- sm.writeTableMeta(outputPath, subQuery.getTableMeta());
+ public static interface QueryHook {
+ QueryState getTargetState();
+ void onEvent(Query query);
+ }
+
+ public static class QueryHookManager {
+ Map<QueryState, List<QueryHook>> hookList = TUtil.newHashMap();
+
+ public void addHook(QueryHook hook) {
+ if (hookList.containsKey(hook.getTargetState())) {
+ hookList.get(hook.getTargetState()).add(hook);
+ } else {
+ hookList.put(hook.getTargetState(), TUtil.newList(hook));
+ }
+ }
+
+ public void doHooks(Query query) {
+ QueryState finalState = query.checkQueryForCompleted();
+ List<QueryHook> list = hookList.get(finalState);
+ if (list != null) {
+ for (QueryHook hook : list) {
+ hook.onEvent(query);
+ }
+ } else {
+ LOG.error("QueryHookManager cannot deal with " + finalState + " event");
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
index f6cf02e..809dce2 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java
@@ -29,7 +29,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
@@ -46,7 +46,7 @@ public class QueryInProgress extends CompositeService {
private QueryId queryId;
- private QueryMeta queryMeta;
+ private QueryContext queryContext;
private TajoAsyncDispatcher dispatcher;
@@ -66,11 +66,11 @@ public class QueryInProgress extends CompositeService {
public QueryInProgress(
TajoMaster.MasterContext masterContext,
- QueryMeta queryMeta,
+ QueryContext queryContext,
QueryId queryId, String sql, LogicalRootNode plan) {
super(QueryInProgress.class.getName());
this.masterContext = masterContext;
- this.queryMeta = queryMeta;
+ this.queryContext = queryContext;
this.queryId = queryId;
this.plan = plan;
@@ -105,7 +105,7 @@ public class QueryInProgress extends CompositeService {
while(true) {
try {
if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) {
- LOG.info("====> " + queryId + " QueryMaster stopped");
+ LOG.info(queryId + " QueryMaster stopped");
queryMasterStopped = true;
break;
}
@@ -206,7 +206,7 @@ public class QueryInProgress extends CompositeService {
null,
TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder()
.setQueryId(queryId.getProto())
- .setQueryMeta(queryMeta.getProto())
+ .setQueryContext(queryContext.getProto())
.setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build())
.build(), NullCallback.get());
querySubmitted.set(true);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
index fb92616..424d5bf 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java
@@ -28,7 +28,7 @@ import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.ipc.TajoMasterProtocol;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.rm.WorkerResource;
@@ -85,9 +85,9 @@ public class QueryJobManager extends CompositeService {
return dispatcher.getEventHandler();
}
- public QueryInfo createNewQueryJob(QueryMeta queryMeta, String sql, LogicalRootNode plan) throws Exception {
+ public QueryInfo createNewQueryJob(QueryContext queryContext, String sql, LogicalRootNode plan) throws Exception {
QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId());
- QueryInProgress queryInProgress = new QueryInProgress(masterContext,queryMeta, queryId, sql, plan);
+ QueryInProgress queryInProgress = new QueryInProgress(masterContext, queryContext, queryId, sql, plan);
synchronized(runningQueries) {
runningQueries.put(queryId, queryInProgress);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index 53b9c05..d45988c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -21,16 +21,15 @@ package org.apache.tajo.master.querymaster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.GlobalOptimizer;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.GlobalPlanner;
@@ -62,7 +61,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
private StorageManager storageManager;
- private QueryConf queryConf;
+ private TajoConf systemConf;
private Map<QueryId, QueryMasterTask> queryMasterTasks = new HashMap<QueryId, QueryMasterTask>();
@@ -84,20 +83,19 @@ public class QueryMaster extends CompositeService implements EventHandler {
public void init(Configuration conf) {
LOG.info("QueryMaster init");
try {
- queryConf = new QueryConf(conf);
- queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
+ systemConf = (TajoConf)conf;
QUERY_SESSION_TIMEOUT = 60 * 1000;
- queryMasterContext = new QueryMasterContext(queryConf);
+ queryMasterContext = new QueryMasterContext(systemConf);
clock = new SystemClock();
this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
addIfService(dispatcher);
- this.storageManager = new StorageManager(queryConf);
+ this.storageManager = new StorageManager(systemConf);
- globalPlanner = new GlobalPlanner(queryConf, storageManager, dispatcher.getEventHandler());
+ globalPlanner = new GlobalPlanner(systemConf, storageManager, dispatcher.getEventHandler());
globalOptimizer = new GlobalOptimizer();
dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler());
@@ -188,13 +186,13 @@ public class QueryMaster extends CompositeService implements EventHandler {
}
public class QueryMasterContext {
- private QueryConf conf;
+ private TajoConf conf;
- public QueryMasterContext(QueryConf conf) {
+ public QueryMasterContext(TajoConf conf) {
this.conf = conf;
}
- public QueryConf getConf() {
+ public TajoConf getConf() {
return conf;
}
@@ -255,9 +253,9 @@ public class QueryMaster extends CompositeService implements EventHandler {
LOG.info("Start QueryStartEventHandler:" + event.getQueryId());
//To change body of implemented methods use File | Settings | File Templates.
QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext,
- event.getQueryId(), event.getQueryMeta(), event.getLogicalPlanJson());
+ event.getQueryId(), event.getQueryContext(), event.getLogicalPlanJson());
- queryMasterTask.init(queryConf);
+ queryMasterTask.init(systemConf);
queryMasterTask.start();
synchronized(queryMasterTasks) {
queryMasterTasks.put(event.getQueryId(), queryMasterTask);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
index 5a79464..ae1c1e5 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterRunner.java
@@ -25,8 +25,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.RackResolver;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.util.TajoIdUtils;
import java.io.PrintWriter;
@@ -34,9 +35,10 @@ import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+@Deprecated
public class QueryMasterRunner extends AbstractService {
private static final Log LOG = LogFactory.getLog(QueryMasterRunner.class);
- private QueryConf queryConf;
+ private TajoConf systemConf;
private QueryMaster queryMaster;
private QueryId queryId;
private String queryMasterManagerAddress;
@@ -59,11 +61,9 @@ public class QueryMasterRunner extends AbstractService {
@Override
public void init(Configuration conf) {
- this.queryConf = (QueryConf)conf;
- RackResolver.init(queryConf);
-
+ this.systemConf = (TajoConf)conf;
+ RackResolver.init(systemConf);
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
-
super.init(conf);
}
@@ -72,7 +72,7 @@ public class QueryMasterRunner extends AbstractService {
//create QueryMaster
QueryMaster query = new QueryMaster(null);
- query.init(queryConf);
+ query.init(systemConf);
query.start();
}
@@ -83,8 +83,8 @@ public class QueryMasterRunner extends AbstractService {
public static void main(String[] args) throws Exception {
LOG.info("QueryMasterRunner started");
- final QueryConf conf = new QueryConf();
- conf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
+ final TajoConf conf = new TajoConf();
+ conf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
UserGroupInformation.setConfiguration(conf);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index f348e44..2212f82 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -39,13 +39,12 @@ import org.apache.tajo.engine.planner.logical.LogicalNode;
import org.apache.tajo.engine.planner.logical.LogicalRootNode;
import org.apache.tajo.engine.planner.logical.NodeType;
import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TajoAsyncDispatcher;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.rm.TajoWorkerResourceManager;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.StorageUtil;
import org.apache.tajo.worker.AbstractResourceAllocator;
import org.apache.tajo.worker.TajoResourceAllocator;
import org.apache.tajo.worker.YarnResourceAllocator;
@@ -60,15 +59,15 @@ public class QueryMasterTask extends CompositeService {
private static final Log LOG = LogFactory.getLog(QueryMasterTask.class.getName());
// query submission directory is private!
- final public static FsPermission USER_DIR_PERMISSION =
+ final public static FsPermission STAGING_DIR_PERMISSION =
FsPermission.createImmutable((short) 0700); // rwx--------
private QueryId queryId;
- private QueryMeta queryMeta;
-
private QueryContext queryContext;
+ private QueryMasterTaskContext queryTaskContext;
+
private QueryMaster.QueryMasterContext queryMasterContext;
private Query query;
@@ -81,11 +80,9 @@ public class QueryMasterTask extends CompositeService {
private final long querySubmitTime;
- private Path outputPath;
-
private Map<String, TableDesc> tableDescMap = new HashMap<String, TableDesc>();
- private QueryConf queryConf;
+ private TajoConf systemConf;
private AtomicLong lastClientHeartbeat = new AtomicLong(-1);
@@ -94,28 +91,28 @@ public class QueryMasterTask extends CompositeService {
private AtomicBoolean stopped = new AtomicBoolean(false);
public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext,
- QueryId queryId, QueryMeta queryMeta, String logicalPlanJson) {
+ QueryId queryId, QueryContext queryContext, String logicalPlanJson) {
super(QueryMasterTask.class.getName());
this.queryMasterContext = queryMasterContext;
this.queryId = queryId;
- this.queryMeta = queryMeta;
+ this.queryContext = queryContext;
this.logicalPlanJson = logicalPlanJson;
this.querySubmitTime = System.currentTimeMillis();
}
@Override
public void init(Configuration conf) {
- queryConf = new QueryConf(conf);
- queryConf.addResource(new Path(QueryConf.QUERY_MASTER_FILENAME));
+ systemConf = (TajoConf)conf;
+
try {
- queryContext = new QueryContext();
+ queryTaskContext = new QueryMasterTaskContext();
String resourceManagerClassName = conf.get("tajo.resource.manager",
TajoWorkerResourceManager.class.getCanonicalName());
if(resourceManagerClassName.indexOf(TajoWorkerResourceManager.class.getName()) >= 0) {
- resourceAllocator = new TajoResourceAllocator(queryContext);
+ resourceAllocator = new TajoResourceAllocator(queryTaskContext);
} else {
- resourceAllocator = new YarnResourceAllocator(queryContext);
+ resourceAllocator = new YarnResourceAllocator(queryTaskContext);
}
addService(resourceAllocator);
@@ -130,7 +127,7 @@ public class QueryMasterTask extends CompositeService {
initStagingDir();
- super.init(queryConf);
+ super.init(systemConf);
} catch (IOException e) {
LOG.error(e.getMessage(), e);
}
@@ -242,14 +239,14 @@ public class QueryMasterTask extends CompositeService {
MasterPlan globalPlan = queryMasterContext.getGlobalPlanner().build(queryId, logicalNodeRoot);
this.masterPlan = queryMasterContext.getGlobalOptimizer().optimize(globalPlan);
- query = new Query(queryContext, queryId, querySubmitTime,
- "", queryContext.getEventHandler(), masterPlan);
+ query = new Query(queryTaskContext, queryId, querySubmitTime,
+ "", queryTaskContext.getEventHandler(), masterPlan);
dispatcher.register(QueryEventType.class, query);
- queryContext.getEventHandler().handle(new QueryEvent(queryId,
+ queryTaskContext.getEventHandler().handle(new QueryEvent(queryId,
QueryEventType.INIT));
- queryContext.getEventHandler().handle(new QueryEvent(queryId,
+ queryTaskContext.getEventHandler().handle(new QueryEvent(queryId,
QueryEventType.START));
} catch (Exception e) {
LOG.error(e.getMessage(), e);
@@ -264,88 +261,85 @@ public class QueryMasterTask extends CompositeService {
* them to variables.
*/
private void initStagingDir() throws IOException {
+
String realUser;
String currentUser;
UserGroupInformation ugi;
ugi = UserGroupInformation.getLoginUser();
realUser = ugi.getShortUserName();
currentUser = UserGroupInformation.getCurrentUser().getShortUserName();
+ FileSystem defaultFS = FileSystem.get(systemConf);
- String givenOutputTableName = queryMeta.getOutputTable();
- Path stagingDir;
+ Path stagingDir = null;
+ Path outputDir = null;
+ try {
+ ////////////////////////////////////////////
+ // Create Output Directory
+ ////////////////////////////////////////////
- // If final output directory is not given by an user,
- // we use the query id as a output directory.
- if (givenOutputTableName == null || givenOutputTableName.isEmpty()) {
- FileSystem defaultFS = FileSystem.get(queryConf);
+ stagingDir = new Path(TajoConf.getStagingRoot(systemConf), queryId.toString());
- Path homeDirectory = defaultFS.getHomeDirectory();
- if (!defaultFS.exists(homeDirectory)) {
- defaultFS.mkdirs(homeDirectory, new FsPermission(USER_DIR_PERMISSION));
+ if (defaultFS.exists(stagingDir)) {
+ throw new IOException("The staging directory '" + stagingDir + "' already exists");
}
-
- Path userQueryDir = new Path(homeDirectory, TajoConstants.USER_QUERYDIR_PREFIX);
-
- if (defaultFS.exists(userQueryDir)) {
- FileStatus fsStatus = defaultFS.getFileStatus(userQueryDir);
- String owner = fsStatus.getOwner();
-
- if (!(owner.equals(currentUser) || owner.equals(realUser))) {
- throw new IOException("The ownership on the user's query " +
- "directory " + userQueryDir + " is not as expected. " +
- "It is owned by " + owner + ". The directory must " +
- "be owned by the submitter " + currentUser + " or " +
- "by " + realUser);
- }
-
- if (!fsStatus.getPermission().equals(USER_DIR_PERMISSION)) {
- LOG.info("Permissions on staging directory " + userQueryDir + " are " +
- "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
- "to correct value " + USER_DIR_PERMISSION);
- defaultFS.setPermission(userQueryDir, new FsPermission(USER_DIR_PERMISSION));
- }
- } else {
- defaultFS.mkdirs(userQueryDir,
- new FsPermission(USER_DIR_PERMISSION));
+ defaultFS.mkdirs(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
+ FileStatus fsStatus = defaultFS.getFileStatus(stagingDir);
+ String owner = fsStatus.getOwner();
+
+ if (!(owner.equals(currentUser) || owner.equals(realUser))) {
+ throw new IOException("The ownership on the user's query " +
+ "directory " + stagingDir + " is not as expected. " +
+ "It is owned by " + owner + ". The directory must " +
+ "be owned by the submitter " + currentUser + " or " +
+ "by " + realUser);
}
- stagingDir = StorageUtil.concatPath(userQueryDir, queryId.toString());
-
- if (defaultFS.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + "already exists. The directory must be unique to each query");
- } else {
- defaultFS.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+ if (!fsStatus.getPermission().equals(STAGING_DIR_PERMISSION)) {
+ LOG.info("Permissions on staging directory " + stagingDir + " are " +
+ "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " +
+ "to correct value " + STAGING_DIR_PERMISSION);
+ defaultFS.setPermission(stagingDir, new FsPermission(STAGING_DIR_PERMISSION));
}
- // Set the query id to the output table name
- queryMeta.setOutputTable(queryId.toString());
-
- } else { // if a output table is given
-
- Path warehouseDir = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR),
- TajoConstants.WAREHOUSE_DIR);
- FileSystem fs = warehouseDir.getFileSystem(queryConf);
-
- if (queryMeta.isFileOutput()) {
- stagingDir = queryMeta.getOutputPath();
- } else {
- stagingDir = new Path(warehouseDir, queryMeta.getOutputTable());
+ // Create a subdirectories
+ defaultFS.mkdirs(new Path(stagingDir, TajoConstants.RESULT_DIR_NAME));
+ LOG.info("The staging dir '" + outputDir + "' is created.");
+ queryContext.setStagingDir(stagingDir);
+
+ ////////////////////////////////////////////////////
+ // Check and Create An Output Directory If Necessary
+ ////////////////////////////////////////////////////
+ if (queryContext.hasOutputPath()) {
+ outputDir = queryContext.getOutputPath();
+ if (queryContext.isOutputOverwrite()) {
+ if (defaultFS.exists(outputDir.getParent())) {
+ if (defaultFS.exists(outputDir)) {
+ defaultFS.delete(outputDir, true);
+ LOG.info("The output directory '" + outputDir + "' is cleaned.");
+ }
+ } else {
+ defaultFS.mkdirs(outputDir.getParent());
+ LOG.info("The output directory's parent '" + outputDir.getParent() + "' is created.");
+ }
+ } else {
+ if (defaultFS.exists(outputDir)) {
+ throw new IOException("The output directory '" + outputDir + " already exists.");
+ }
+ }
+ }
+ } catch (IOException ioe) {
+ if (stagingDir != null && defaultFS.exists(stagingDir)) {
+ defaultFS.delete(stagingDir, true);
+ LOG.info("The staging directory '" + stagingDir + "' is deleted");
}
- if (!queryMeta.isOutputOverwrite()) {
- if (fs.exists(stagingDir)) {
- throw new IOException("The staging directory " + stagingDir
- + " already exists. The directory must be unique to each query");
- }
+ if (outputDir != null && defaultFS.exists(outputDir)) {
+ defaultFS.delete(outputDir, true);
+ LOG.info("The output directory '" + outputDir + "' is deleted");
}
- fs.mkdirs(stagingDir, new FsPermission(USER_DIR_PERMISSION));
+ throw ioe;
}
-
- queryMeta.setOutputPath(stagingDir);
- outputPath = stagingDir;
- LOG.info("Initialized Query Staging Dir: " + outputPath);
}
public Query getQuery() {
@@ -356,12 +350,12 @@ public class QueryMasterTask extends CompositeService {
stop();
}
- public QueryContext getQueryContext() {
- return queryContext;
+ public QueryMasterTaskContext getQueryTaskContext() {
+ return queryTaskContext;
}
public EventHandler getEventHandler() {
- return queryContext.getEventHandler();
+ return queryTaskContext.getEventHandler();
}
public void touchSessionTime() {
@@ -384,18 +378,18 @@ public class QueryMasterTask extends CompositeService {
}
}
- public class QueryContext {
+ public class QueryMasterTaskContext {
EventHandler eventHandler;
public QueryMaster.QueryMasterContext getQueryMasterContext() {
return queryMasterContext;
}
- public QueryMeta getQueryMeta() {
- return queryMeta;
+ public QueryContext getQueryContext() {
+ return queryContext;
}
- public QueryConf getConf() {
- return queryConf;
+ public TajoConf getConf() {
+ return systemConf;
}
public Clock getClock() {
@@ -414,8 +408,8 @@ public class QueryMasterTask extends CompositeService {
return queryMasterContext.getStorageManager();
}
- public Path getOutputPath() {
- return outputPath;
+ public Path getStagingDir() {
+ return queryContext.getStagingDir();
}
public synchronized EventHandler getEventHandler() {
@@ -445,7 +439,7 @@ public class QueryMasterTask extends CompositeService {
}
public AbstractResourceAllocator getResourceAllocator() {
- return (AbstractResourceAllocator)resourceAllocator;
+ return resourceAllocator;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 0cd5d64..43895d8 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -69,14 +69,14 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private static final Log LOG = LogFactory.getLog(SubQuery.class);
- private QueryMeta queryMeta;
+ private QueryContext queryContext;
private ExecutionBlock block;
private int priority;
private TableMeta meta;
private EventHandler eventHandler;
private final StorageManager sm;
private TaskSchedulerImpl taskScheduler;
- private QueryMasterTask.QueryContext context;
+ private QueryMasterTask.QueryMasterTaskContext context;
private long startTime;
private long finishTime;
@@ -136,7 +136,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
private int completedTaskCount = 0;
- public SubQuery(QueryMasterTask.QueryContext context, ExecutionBlock block, StorageManager sm) {
+ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock block, StorageManager sm) {
this.context = context;
this.block = block;
this.sm = sm;
@@ -153,7 +153,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
state == SubQueryState.CONTAINER_ALLOCATED || state == SubQueryState.RUNNING;
}
- public QueryMasterTask.QueryContext getContext() {
+ public QueryMasterTask.QueryMasterTaskContext getContext() {
return context;
}
@@ -335,9 +335,10 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
} else {
stat = computeStatFromTasks();
}
- TableMeta meta = writeStat(this, stat);
+
+ StoreTableNode storeTableNode = getBlock().getStoreTableNode();
+ TableMeta meta = toTableMeta(storeTableNode);
meta.setStat(stat);
- setTableMeta(meta);
return meta;
}
@@ -356,7 +357,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
StoreTableNode storeTableNode = execBlock.getStoreTableNode();
TableMeta meta = toTableMeta(storeTableNode);
meta.setStat(stat);
- sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
+ //sm.writeTableMeta(sm.getTablePath(execBlock.getOutputName()), meta);
return meta;
}
@@ -597,7 +598,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
return maxTaskNum;
}
- public static long getInputVolume(QueryMasterTask.QueryContext context, ExecutionBlock execBlock) {
+ public static long getInputVolume(QueryMasterTask.QueryMasterTaskContext context, ExecutionBlock execBlock) {
Map<String, TableDesc> tableMap = context.getTableDescMap();
if (execBlock.isLeafBlock()) {
ScanNode outerScan = execBlock.getScanNodes()[0];
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 3701d58..eb710fa 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -211,15 +211,6 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
LOG.info("====> allocateWorkerResources: allocated:" + workerResources.size());
-// if(LOG.isDebugEnabled()) {
-// LOG.debug("====> allocateWorkerResources:" +
-// (new ExecutionBlockId(resourceRequest.request.getExecutionBlockId())) +
-// ", required:" + resourceRequest.request.getNumWorks() + ", allocated:" + workerResources.size());
-// } else {
-// LOG.info("====> allocateWorkerResources: required:" + resourceRequest.request.getNumWorks() +
-// ", allocated:" + workerResources.size() + ", queryMasterRequest=" + resourceRequest.queryMasterRequest);
-// }
-
if(workerResources.size() > 0) {
if(resourceRequest.queryMasterRequest) {
startQueryMaster(resourceRequest.queryId, workerResources.get(0));
@@ -386,7 +377,7 @@ public class TajoWorkerResourceManager implements WorkerResourceManager {
allWorkerResourceMap.put(workerResource.getId(), workerResource);
liveWorkerResources.add(hostAndPort);
- LOG.info("====> TajoWorker:" + workerResource + " added in live TajoWorker list");
+ LOG.info("TajoWorker:" + workerResource + " added in live TajoWorker list");
workerResourceLock.notifyAll();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
index 4392158..2202183 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -57,10 +57,10 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
private static final Log LOG = LogFactory.getLog(YarnRMContainerAllocator.
class.getName());
- private QueryMasterTask.QueryContext context;
+ private QueryMasterTask.QueryMasterTaskContext context;
private final EventHandler eventHandler;
- public YarnRMContainerAllocator(QueryMasterTask.QueryContext context) {
+ public YarnRMContainerAllocator(QueryMasterTask.QueryMasterTaskContext context) {
super(ApplicationIdUtils.createApplicationAttemptId(context.getQueryId()));
this.context = context;
this.eventHandler = context.getDispatcher().getEventHandler();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
index fe31bc6..da7daf0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnTajoResourceManager.java
@@ -246,7 +246,6 @@ public class YarnTajoResourceManager implements WorkerResourceManager {
//vargs.add("-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005");
//}
// Set class name
- //vargs.add(QueryMasterRunner.class.getCanonicalName());
vargs.add(TajoWorker.class.getCanonicalName());
vargs.add("qm");
vargs.add(queryId.toString()); // queryId
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index c1afb80..fa2ff14 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryConf;
-import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.ContainerProxy;
@@ -68,15 +66,15 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
static AtomicInteger containerIdSeq = new AtomicInteger(0);
private TajoConf tajoConf;
- private QueryMasterTask.QueryContext queryContext;
+ private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
private final ExecutorService executorService;
private AtomicBoolean stopped = new AtomicBoolean(false);
- public TajoResourceAllocator(QueryMasterTask.QueryContext queryContext) {
- this.queryContext = queryContext;
+ public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+ this.queryTaskContext = queryTaskContext;
executorService = Executors.newFixedThreadPool(
- queryContext.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.AM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
@Override
@@ -102,9 +100,9 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
public void init(Configuration conf) {
tajoConf = (TajoConf)conf;
- queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
+ queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, new TajoTaskRunnerLauncher());
//
- queryContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
+ queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, new TajoWorkerAllocationHandler());
super.init(conf);
}
@@ -117,7 +115,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
stopped.set(true);
executorService.shutdownNow();
- Map<ContainerId, ContainerProxy> containers = queryContext.getResourceAllocator().getContainers();
+ Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
for(ContainerProxy eachProxy: list) {
try {
@@ -162,36 +160,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
private void launchTaskRunners(ExecutionBlockId executionBlockId, Collection<Container> containers) {
- FileSystem fs = null;
-
- QueryConf queryConf = queryContext.getConf();
- LOG.info("defaultFS: " + queryConf.get("fs.default.name"));
- LOG.info("defaultFS: " + queryConf.get("fs.defaultFS"));
- try {
- fs = FileSystem.get(queryConf);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
-
- try {
- // TODO move to tajo temp
- Path warehousePath = new Path(queryConf.getVar(TajoConf.ConfVars.ROOT_DIR), TajoConstants.WAREHOUSE_DIR);
- Path queryConfPath = new Path(warehousePath, executionBlockId.getQueryId().toString());
- queryConfPath = new Path(queryConfPath, QueryConf.FILENAME);
-
- if(!fs.exists(queryConfPath)){
- LOG.info("Writing a QueryConf to HDFS and add to local environment, outputPath=" + queryConfPath);
- writeConf(queryConf, queryConfPath);
- } else {
- LOG.warn("QueryConf already exist. path: " + queryConfPath.toString());
- }
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- //Query in standby mode doesn't need launch Worker.
- //But, Assign ExecutionBlock to assigned tajo worker
+ // Query in standby mode doesn't need launch Worker.
+ // But, Assign ExecutionBlock to assigned tajo worker
for(Container eachContainer: containers) {
- TajoContainerProxy containerProxy = new TajoContainerProxy(queryContext, tajoConf,
+ TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
eachContainer, executionBlockId);
executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
}
@@ -213,7 +185,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
private void stopContainers(Collection<Container> containers) {
for (Container container : containers) {
- final ContainerProxy proxy = queryContext.getResourceAllocator().getContainer(container.getId());
+ final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
executorService.submit(new StopContainerRunner(container.getId(), proxy));
}
}
@@ -248,7 +220,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
@Override
public void run() {
- LOG.info("======> Start TajoWorkerAllocationThread");
+ LOG.info("Start TajoWorkerAllocationThread");
CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
new CallFuture2<TajoMasterProtocol.WorkerResourceAllocationResponse>();
@@ -262,7 +234,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
.setExecutionBlockId(event.getExecutionBlockId().getProto())
.build();
- queryContext.getQueryMasterContext().getWorkerContext().
+ queryTaskContext.getQueryMasterContext().getWorkerContext().
getTajoMasterRpcClient().allocateWorkerResources(null, request, callBack);
TajoMasterProtocol.WorkerResourceAllocationResponse response = null;
@@ -315,14 +287,14 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
containers.add(container);
}
- SubQueryState state = queryContext.getSubQuery(executionBlockId).getState();
+ SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getState();
if (!SubQuery.isRunningState(state)) {
List<WorkerResource> workerResources = new ArrayList<WorkerResource>();
for(Container eachContainer: containers) {
workerResources.add(((TajoWorkerContainer)eachContainer).getWorkerResource());
}
try {
- TajoContainerProxy.releaseWorkerResource(queryContext, executionBlockId, workerResources);
+ TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, workerResources);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
@@ -333,7 +305,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
if(LOG.isDebugEnabled()) {
LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId);
}
- queryContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
+ queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers));
}
numAllocatedWorkers += workerHosts.size();
@@ -345,7 +317,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
event.getRequiredNum() - numAllocatedWorkers,
event.isLeafQuery(), event.getProgress()
);
- queryContext.getEventHandler().handle(shortRequestEvent);
+ queryTaskContext.getEventHandler().handle(shortRequestEvent);
}
LOG.info("Stop TajoWorkerAllocationThread");
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
index 0fc896b..ff25fb3 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorker.java
@@ -21,13 +21,16 @@ package org.apache.tajo.worker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.CatalogClient;
+import org.apache.tajo.catalog.CatalogConstants;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoMasterProtocol;
@@ -148,9 +151,9 @@ public class TajoWorker extends CompositeService {
tajoWorkerManagerService = new TajoWorkerManagerService(workerContext, managerPort);
addService(tajoWorkerManagerService);
- LOG.info("====> Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
+ LOG.info("Tajo worker started: mode=" + daemonMode + ", clientPort=" + clientPort + ", managerPort=" + managerPort);
} else {
- LOG.info("====> Tajo worker started: mode=" + daemonMode);
+ LOG.info("Tajo worker started: mode=" + daemonMode);
}
super.init(conf);
@@ -252,8 +255,7 @@ public class TajoWorker extends CompositeService {
}
private void setWorkerMode(String[] params) {
- if("qm".equals(daemonMode)) {
- //QueryMaster mode
+ if("qm".equals(daemonMode)) { //QueryMaster mode
String tajoMasterAddress = params[2];
connectToTajoMaster(tajoMasterAddress);
@@ -262,11 +264,9 @@ public class TajoWorker extends CompositeService {
QueryId queryId = TajoIdUtils.parseQueryId(params[1]);
tajoWorkerManagerService.getQueryMaster().reportQueryStatusToQueryMaster(
queryId, TajoProtos.QueryState.QUERY_MASTER_LAUNCHED);
- } else if("tr".equals(daemonMode)) {
- //TaskRunner mode
+ } else if("tr".equals(daemonMode)) { //TaskRunner mode
taskRunnerManager.startTask(params);
- } else {
- //Standby mode
+ } else { //Standby mode
connectToTajoMaster(tajoConf.get("tajo.master.manager.addr"));
connectToCatalog();
workerHeartbeatThread = new WorkerHeartbeatThread();
@@ -297,10 +297,10 @@ public class TajoWorker extends CompositeService {
private void connectToCatalog() {
// TODO: To be improved. it's a hack. It assumes that CatalogServer is embedded in TajoMaster.
- String hostName = this.tajoMasterAddress.getHostName();
- int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
+ String catalogAddr = tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS);
+ //int port = Integer.parseInt(tajoConf.getVar(TajoConf.ConfVars.CATALOG_ADDRESS).split(":")[1]);
try {
- catalogClient = new CatalogClient(hostName, port);
+ catalogClient = new CatalogClient(tajoConf);
} catch (IOException e) {
e.printStackTrace();
}
@@ -489,9 +489,12 @@ public class TajoWorker extends CompositeService {
String workerMode = args[0];
+ TajoConf tajoConf = new TajoConf();
+ tajoConf.addResource(new Path(TajoConstants.SYSTEM_CONF_FILENAME));
+
try {
TajoWorker tajoWorker = new TajoWorker(workerMode);
- tajoWorker.startWorker(new TajoConf(new YarnConfiguration()), args);
+ tajoWorker.startWorker(tajoConf, args);
} catch (Throwable t) {
LOG.fatal("Error starting TajoWorker", t);
System.exit(-1);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
index e1fd88a..a77ad2a 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java
@@ -175,8 +175,8 @@ public class TajoWorkerClientService extends AbstractService {
builder.setSubmitTime(query.getAppSubmitTime());
builder.setInitTime(query.getInitializationTime());
builder.setHasResult(
- !(queryMasterTask.getQueryContext().getQueryMeta().isCreateTable() ||
- queryMasterTask.getQueryContext().getQueryMeta().isInsert())
+ !(queryMasterTask.getQueryTaskContext().getQueryContext().isCreateTable() ||
+ queryMasterTask.getQueryTaskContext().getQueryContext().isInsert())
);
if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
builder.setFinishTime(query.getFinishTime());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
index 6fde6e4..28cc5f6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java
@@ -31,7 +31,7 @@ import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoIdProtos;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.TajoWorkerProtocol;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.master.TaskSchedulerImpl;
import org.apache.tajo.master.event.*;
import org.apache.tajo.master.querymaster.QueryMaster;
@@ -125,13 +125,13 @@ public class TajoWorkerManagerService extends CompositeService
ExecutionBlockId ebId = new ExecutionBlockId(request.getExecutionBlockId());
QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(ebId.getQueryId());
ContainerId cid =
- queryMasterTask.getQueryContext().getResourceAllocator().makeContainerId(request.getContainerId());
+ queryMasterTask.getQueryTaskContext().getResourceAllocator().makeContainerId(request.getContainerId());
if(queryMasterTask == null || queryMasterTask.isStopped()) {
- LOG.info("====>getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
+ LOG.info("getTask:" + cid + ", ebId:" + ebId + ", but query is finished.");
done.run(TaskSchedulerImpl.stopTaskRunnerReq);
} else {
- LOG.info("====>getTask:" + cid + ", ebId:" + ebId);
+ LOG.info("getTask:" + cid + ", ebId:" + ebId);
queryMasterTask.handleTaskRequestEvent(new TaskRequestEvent(cid, ebId, done));
}
} catch (Exception e) {
@@ -202,7 +202,7 @@ public class TajoWorkerManagerService extends CompositeService
QueryId queryId = new QueryId(request.getQueryId());
LOG.info("Receive executeQuery request:" + queryId);
queryMaster.handle(new QueryStartEvent(queryId,
- new QueryMeta(request.getQueryMeta()), request.getLogicalPlanJson().getValue()));
+ new QueryContext(request.getQueryContext()), request.getLogicalPlanJson().getValue()));
done.run(TajoWorker.TRUE_PROTO);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
index aef5ead..e66751c 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/Task.java
@@ -30,13 +30,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoProtos.TaskAttemptState;
import org.apache.tajo.TaskAttemptContext;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.exception.UnfinishedTaskException;
import org.apache.tajo.engine.json.CoreGsonHelper;
import org.apache.tajo.engine.planner.PlannerUtil;
@@ -48,7 +49,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.*;
import org.apache.tajo.ipc.TajoWorkerProtocol.TajoWorkerProtocolService.Interface;
import org.apache.tajo.ipc.protocolrecords.QueryUnitRequest;
import org.apache.tajo.master.ExecutionBlock.PartitionType;
-import org.apache.tajo.master.QueryMeta;
+import org.apache.tajo.master.QueryContext;
import org.apache.tajo.rpc.NullCallback;
import org.apache.tajo.storage.Fragment;
import org.apache.tajo.storage.StorageUtil;
@@ -69,8 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class Task {
private static final Log LOG = LogFactory.getLog(Task.class);
- private final QueryConf conf;
- private final QueryMeta queryMeta;
+ private final TajoConf systemConf;
+ private final QueryContext queryContext;
private final FileSystem localFS;
private final TaskRunner.TaskRunnerContext taskRunnerContext;
private final Interface masterProxy;
@@ -137,8 +138,8 @@ public class Task {
this.reporter.startCommunicationThread();
this.taskId = request.getId();
- this.conf = worker.getQueryConf();
- this.queryMeta = request.getQueryMeta();
+ this.systemConf = worker.getConf();
+ this.queryContext = request.getQueryContext();
this.taskRunnerContext = worker;
this.masterProxy = masterProxy;
this.localFS = worker.getLocalFS();
@@ -146,7 +147,7 @@ public class Task {
this.taskDir = StorageUtil.concatPath(taskRunnerContext.getBaseDir(),
taskId.getQueryUnitId().getId() + "_" + taskId.getId());
- this.context = new TaskAttemptContext(conf, taskId,
+ this.context = new TaskAttemptContext(systemConf, taskId,
request.getFragments().toArray(new Fragment[request.getFragments().size()]),
taskDir);
plan = CoreGsonHelper.fromJson(request.getSerializedData(), LogicalNode.class);
@@ -163,7 +164,7 @@ public class Task {
} else {
// The final result of a task will be written in a file named part-ss-nnnnnnn,
// where ss is the subquery id associated with this task, and nnnnnn is the task id.
- Path outFilePath = new Path(queryMeta.getOutputPath(),
+ Path outFilePath = StorageUtil.concatPath(queryContext.getStagingDir(), TajoConstants.RESULT_DIR_NAME,
OUTPUT_FILE_PREFIX +
OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
OUTPUT_FILE_FORMAT_TASK.get().format(taskId.getQueryUnitId().getId()));
@@ -201,7 +202,7 @@ public class Task {
if (request.getFetches().size() > 0) {
inputTableBaseDir = localFS.makeQualified(
lDirAllocator.getLocalPathForWrite(
- getTaskAttemptDir(context.getTaskId()).toString() + "/in", conf));
+ getTaskAttemptDir(context.getTaskId()).toString() + "/in", systemConf));
localFS.mkdirs(inputTableBaseDir);
Path tableDir;
for (String inputTable : context.getInputTables()) {
@@ -451,7 +452,7 @@ public class Task {
private Fragment[] localizeFetchedData(File file, String name, TableMeta meta)
throws IOException {
- Configuration c = new Configuration(conf);
+ Configuration c = new Configuration(systemConf);
c.set("fs.default.name", "file:///");
FileSystem fs = FileSystem.get(c);
Path tablePath = new Path(file.getAbsolutePath());
@@ -525,7 +526,7 @@ public class Task {
if (fetches.size() > 0) {
Path inputDir = lDirAllocator.
getLocalPathToRead(
- getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", conf);
+ getTaskAttemptDir(ctx.getTaskId()).toString() + "/in", systemConf);
File storeDir;
int i = 0;
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 6128bb3..3056595 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -59,7 +59,7 @@ public class TaskRunner extends AbstractService {
/** class logger */
private static final Log LOG = LogFactory.getLog(TaskRunner.class);
- private QueryConf queryConf;
+ private TajoConf systemConf;
private volatile boolean stopped = false;
@@ -105,7 +105,7 @@ public class TaskRunner extends AbstractService {
private TaskRunnerManager taskRunnerManager;
- public TaskRunner(TaskRunnerManager taskRunnerManager, QueryConf conf, String[] args) {
+ public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
super(TaskRunner.class.getName());
this.taskRunnerManager = taskRunnerManager;
@@ -130,8 +130,7 @@ public class TaskRunner extends AbstractService {
LOG.info("QueryMaster Address:" + masterAddr);
// TODO - 'load credential' should be implemented
// Getting taskOwner
- UserGroupInformation taskOwner =
- UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.TAJO_USERNAME));
//taskOwner.addToken(token);
// initialize MasterWorkerProtocol as an actual task owner.
@@ -161,16 +160,15 @@ public class TaskRunner extends AbstractService {
@Override
public void init(Configuration conf) {
- this.queryConf = (QueryConf)conf;
+ this.systemConf = (TajoConf)conf;
try {
// initialize DFS and LocalFileSystems
- defaultFS = FileSystem.get(URI.create(queryConf.getVar(ConfVars.ROOT_DIR)),conf);
+ defaultFS = TajoConf.getTajoRootPath(systemConf).getFileSystem(conf);
localFS = FileSystem.getLocal(conf);
// the base dir for an output dir
- baseDir = queryId.toString()
- + "/output" + "/" + executionBlockId.getId();
+ baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
// initialize LocalDirAllocator
lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
@@ -180,7 +178,7 @@ public class TaskRunner extends AbstractService {
// Setup QueryEngine according to the query plan
// Here, we can setup row-based query engine or columnar query engine.
- this.queryEngine = new TajoQueryEngine(queryConf);
+ this.queryEngine = new TajoQueryEngine(systemConf);
} catch (Throwable t) {
LOG.error(t);
}
@@ -222,8 +220,8 @@ public class TaskRunner extends AbstractService {
}
public class TaskRunnerContext {
- public QueryConf getQueryConf() {
- return queryConf;
+ public TajoConf getConf() {
+ return systemConf;
}
public String getNodeId() {
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
index 5d8fa8d..f1ca567 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -22,7 +22,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.CompositeService;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.conf.TajoConf;
import java.util.HashMap;
@@ -88,13 +87,13 @@ public class TaskRunnerManager extends CompositeService {
Thread t = new Thread() {
public void run() {
try {
- QueryConf queryConf = new QueryConf(tajoConf);
- TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, queryConf, params);
+ TajoConf systemConf = new TajoConf(tajoConf);
+ TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params);
LOG.info("Start TaskRunner:" + taskRunner.getId());
synchronized(taskRunnerMap) {
taskRunnerMap.put(taskRunner.getId(), taskRunner);
}
- taskRunner.init(queryConf);
+ taskRunner.init(systemConf);
taskRunner.start();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index 2897b14..8fc3884 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.yarn.client.YarnClient;
import org.apache.hadoop.yarn.client.YarnClientImpl;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.YarnProtos;
-import org.apache.tajo.QueryConf;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TaskRunnerGroupEvent;
import org.apache.tajo.master.TaskRunnerLauncher;
@@ -47,12 +46,12 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
- private QueryMasterTask.QueryContext queryContext;
+ private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
- private QueryConf queryConf;
+ private TajoConf systemConf;
- public YarnResourceAllocator(QueryMasterTask.QueryContext queryContext) {
- this.queryContext = queryContext;
+ public YarnResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+ this.queryTaskContext = queryTaskContext;
}
@Override
@@ -75,19 +74,19 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
@Override
public void init(Configuration conf) {
- queryConf = (QueryConf)conf;
+ systemConf = (TajoConf)conf;
- yarnRPC = YarnRPC.create(queryConf);
+ yarnRPC = YarnRPC.create(systemConf);
connectYarnClient();
- taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryContext, yarnRPC);
+ taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryTaskContext, yarnRPC);
addService((org.apache.hadoop.yarn.service.Service) taskRunnerLauncher);
- queryContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+ queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
- rmAllocator = new YarnRMContainerAllocator(queryContext);
+ rmAllocator = new YarnRMContainerAllocator(queryTaskContext);
addService(rmAllocator);
- queryContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
+ queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
super.init(conf);
}
@@ -108,7 +107,7 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
private void connectYarnClient() {
this.yarnClient = new YarnClientImpl();
- this.yarnClient.init(queryConf);
+ this.yarnClient.init(systemConf);
this.yarnClient.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
index 0694b4e..94bacdf 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoWorkerProtocol.proto
@@ -59,7 +59,7 @@ message QueryUnitRequestProto {
optional bool interQuery = 6 [default = false];
repeated Fetch fetches = 7;
optional bool shouldDie = 8;
- optional KeyValueSetProto queryMeta = 9;
+ optional KeyValueSetProto queryContext = 9;
}
message Fetch {
@@ -105,7 +105,7 @@ message Partition {
message QueryExecutionRequestProto {
required QueryIdProto queryId = 1;
- required KeyValueSetProto queryMeta = 2;
+ required KeyValueSetProto queryContext = 2;
required StringProto logicalPlanJson = 3;
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
index 2025110..a4e39ff 100644
--- a/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/main/resources/tajo-default.xml
@@ -32,6 +32,11 @@
</property>
<property>
+ <name>tajo.staging.root.dir</name>
+ <value>/tmp/tajo-${user.name}/staging</value>
+ </property>
+
+ <property>
<name>tajo.task.localdir</name>
<value>/tmp/tajo-localdir</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
index e629623..5a85a07 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -21,11 +21,9 @@
*/
package org.apache.tajo;
-import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.tajo.algebra.Expr;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -33,17 +31,10 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.planner.*;
-import org.apache.tajo.engine.planner.logical.LogicalNode;
-import org.apache.tajo.engine.planner.physical.PhysicalExec;
-import org.apache.tajo.engine.query.ResultSetImpl;
import org.apache.tajo.storage.*;
import org.apache.tajo.util.FileUtil;
-import org.apache.tajo.util.TUtil;
-import java.io.File;
import java.io.IOException;
-import java.sql.ResultSet;
-import java.util.List;
import java.util.UUID;
public class BackendTestingUtil {
@@ -102,36 +93,12 @@ public class BackendTestingUtil {
public BackendTestingUtil(TajoConf conf) throws IOException {
this.conf = conf;
- this.catalog = new LocalCatalog(conf);
+ this.catalog = new LocalCatalogWrapper(conf);
analyzer = new SQLAnalyzer();
planner = new LogicalPlanner(catalog);
optimizer = new LogicalOptimizer();
}
- public ResultSet run(String [] tableNames, File [] tables, Schema [] schemas, String query)
- throws IOException, PlanningException {
- Path workDir = createTmpTestDir();
- StorageManager sm = StorageManager.get(new TajoConf(), workDir);
- List<Fragment> frags = Lists.newArrayList();
- for (int i = 0; i < tableNames.length; i++) {
- Fragment [] splits = sm.split(tableNames[i], new Path(tables[i].getAbsolutePath()));
- for (Fragment f : splits) {
- frags.add(f);
- }
- }
-
- TaskAttemptContext ctx = new TaskAttemptContext(conf,
- TUtil.newQueryUnitAttemptId(),
- frags.toArray(new Fragment[frags.size()]), workDir);
- Expr EXPR = analyzer.parse(query);
- LogicalPlan plan = planner.createPlan(EXPR);
- LogicalNode rootNode = optimizer.optimize(plan);
- PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
- PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
-
- return new ResultSetImpl(null, null, conf, new Path(workDir, "out"));
- }
-
public static Path createTmpTestDir() throws IOException {
String randomStr = UUID.randomUUID().toString();
FileSystem fs = FileSystem.getLocal(new Configuration());
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
index 95dc212..3a70ac5 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -39,9 +39,11 @@ import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.pullserver.PullServerAuxService;
+import org.apache.tajo.util.NetUtils;
import java.io.File;
import java.io.IOException;
+import java.net.InetSocketAddress;
/**
* Configures and starts the Tajo-specific components in the YARN cluster.
@@ -64,6 +66,10 @@ public class MiniTajoYarnCluster extends MiniYARNCluster {
@Override
public void init(Configuration conf) {
+
+ conf.setSocketAddr(YarnConfiguration.RM_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+ conf.setSocketAddr(YarnConfiguration.RM_SCHEDULER_ADDRESS, new InetSocketAddress("127.0.0.1", 0));
+
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 0adf22a..88029ea 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
@@ -71,8 +72,7 @@ public class TajoTestingCluster {
* System property key to get test directory value.
* Name is as it is because mini dfs has hard-codings to put test data here.
*/
- public static final String TEST_DIRECTORY_KEY =
- MiniDFSCluster.PROP_TEST_BUILD_DATA;
+ public static final String TEST_DIRECTORY_KEY = MiniDFSCluster.PROP_TEST_BUILD_DATA;
/**
* Default parent directory for test output.
@@ -222,7 +222,7 @@ public class TajoTestingCluster {
}
conf.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
- conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
+ conf.set(CatalogConstants.JDBC_URI, "jdbc:derby:" + clusterTestBuildDir.getAbsolutePath() + "/db");
LOG.info("Apache Derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
conf.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
@@ -255,7 +255,7 @@ public class TajoTestingCluster {
c.setVar(ConfVars.CATALOG_ADDRESS, "localhost:0");
c.set(CatalogConstants.STORE_CLASS, "org.apache.tajo.catalog.store.MemStore");
- c.set(CatalogConstants.JDBC_URI, "jdbc:derby:target/test-data/tcat/db");
+ c.set(CatalogConstants.JDBC_URI, "jdbc:derby:" + testBuildDir.getAbsolutePath() + "/db");
LOG.info("derby repository is set to "+conf.get(CatalogConstants.JDBC_URI));
@@ -300,7 +300,7 @@ public class TajoTestingCluster {
tajoWorker.startWorker(workerConf, new String[]{"standby"});
- LOG.info("=====> MiniTajoCluster Worker #" + (i + 1) + " started.");
+ LOG.info("MiniTajoCluster Worker #" + (i + 1) + " started.");
tajoWorkers.add(tajoWorker);
}
}
@@ -348,14 +348,10 @@ public class TajoTestingCluster {
*/
public void startMiniCluster(final int numSlaves)
throws Exception {
- String localHostName = InetAddress.getLocalHost().getHostName();
- startMiniCluster(numSlaves, new String[] {localHostName});
+ startMiniCluster(numSlaves, null);
}
- public void startMiniCluster(final int numSlaves,
- final String [] dataNodeHosts) throws Exception {
- // the conf is set to the distributed mode.
- this.conf.setBoolVar(ConfVars.CLUSTER_DISTRIBUTED, true);
+ public void startMiniCluster(final int numSlaves, final String [] dataNodeHosts) throws Exception {
int numDataNodes = numSlaves;
if(dataNodeHosts != null && dataNodeHosts.length != 0) {
@@ -406,12 +402,11 @@ public class TajoTestingCluster {
yarnCluster.init(conf);
yarnCluster.start();
- conf.set(YarnConfiguration.RM_ADDRESS,
- NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
- getClientRMService().getBindAddress()));
- conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
- NetUtils.normalizeInetSocketAddress(yarnCluster.getResourceManager().
- getApplicationMasterService().getBindAddress()));
+ ResourceManager resourceManager = yarnCluster.getResourceManager();
+ InetSocketAddress rmAddr = resourceManager.getClientRMService().getBindAddress();
+ InetSocketAddress rmSchedulerAddr = resourceManager.getApplicationMasterService().getBindAddress();
+ conf.set(YarnConfiguration.RM_ADDRESS, NetUtils.normalizeInetSocketAddress(rmAddr));
+ conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, NetUtils.normalizeInetSocketAddress(rmSchedulerAddr));
URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
if (url == null) {
@@ -425,9 +420,6 @@ public class TajoTestingCluster {
}
public void startMiniClusterInLocal(final int numSlaves) throws Exception {
- // the conf is set to the distributed mode.
- this.conf.setBoolVar(ConfVars.CLUSTER_DISTRIBUTED, true);
-
// If we already put up a cluster, fail.
String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
isRunningCluster(testBuildPath);
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
index a16d0f3..ba7d36b 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java
@@ -22,15 +22,13 @@
package org.apache.tajo.engine.query;
import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.tajo.IntegrationTest;
import org.apache.tajo.TajoTestingCluster;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.catalog.statistics.TableStat;
import org.apache.tajo.common.TajoDataTypes.Type;
@@ -52,6 +50,7 @@ public class TestResultSetImpl {
private static TajoTestingCluster util;
private static TajoConf conf;
private static StorageManager sm;
+ private static TableDesc desc;
private static TableMeta scoreMeta;
@BeforeClass
@@ -90,7 +89,7 @@ public class TestResultSetImpl {
stat.setNumBlocks(1000);
stat.setNumPartitions(100);
scoreMeta.setStat(stat);
- sm.writeTableMeta(sm.getTablePath("score"), scoreMeta);
+ desc = new TableDescImpl("score", scoreMeta, p);
}
@AfterClass
@@ -100,7 +99,7 @@ public class TestResultSetImpl {
@Test
public void test() throws IOException, SQLException {
- ResultSetImpl rs = new ResultSetImpl(null, null, conf, sm.getTablePath("score"));
+ ResultSetImpl rs = new ResultSetImpl(null, null, conf, desc);
ResultSetMetaData meta = rs.getMetaData();
assertNotNull(meta);
Schema schema = scoreMeta.getSchema();
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
index aedaac3..7012cd9 100644
--- a/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
+++ b/tajo-core/tajo-core-backend/src/test/resources/tajo-default.xml
@@ -32,6 +32,11 @@
</property>
<property>
+ <name>tajo.staging.root.dir</name>
+ <value>/tmp/tajo-${user.name}/staging</value>
+ </property>
+
+ <property>
<name>tajo.task.localdir</name>
<value>/tmp/tajo-localdir</value>
</property>
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/27f76811/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml b/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml
deleted file mode 100644
index 72f0bb9..0000000
--- a/tajo-core/tajo-core-pullserver/src/main/resources/tajo-default.xml
+++ /dev/null
@@ -1,33 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-
-<configuration>
- <property>
- <name>tajo.cluster.distributed</name>
- <value>false</value>
- </property>
-
- <property>
- <name>tajo.rootdir</name>
- <value>file:///tmp/tajo-${user.name}</value>
- <description>A base for other temporary directories.</description>
- </property>
-</configuration>