You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/04/18 11:19:39 UTC
[17/51] [partial] TAJO-752: Escalate sub modules in tajo-core into
the top-level modules. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
new file mode 100644
index 0000000..0973aa7
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskHistory.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.FileUtil;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+
+public class TaskHistory {
+ private long startTime;
+ private long finishTime;
+
+ private String status;
+ private String outputPath;
+ private String workingPath;
+ private float progress;
+
+ private TableStats inputStats;
+ private TableStats outputStats;
+
+ Map<URI, FetcherHistory> fetchers;
+
+ public static class FetcherHistory {
+ private long startTime;
+ private long finishTime;
+
+ private String status;
+ private String uri;
+ private long fileLen;
+ private int messageReceiveCount;
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getUri() {
+ return uri;
+ }
+
+ public void setUri(String uri) {
+ this.uri = uri;
+ }
+
+ public long getFileLen() {
+ return fileLen;
+ }
+
+ public void setFileLen(long fileLen) {
+ this.fileLen = fileLen;
+ }
+
+ public int getMessageReceiveCount() {
+ return messageReceiveCount;
+ }
+
+ public void setMessageReceiveCount(int messageReceiveCount) {
+ this.messageReceiveCount = messageReceiveCount;
+ }
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getOutputPath() {
+ return outputPath;
+ }
+
+ public void setOutputPath(String outputPath) {
+ this.outputPath = outputPath;
+ }
+
+ public String getWorkingPath() {
+ return workingPath;
+ }
+
+ public void setWorkingPath(String workingPath) {
+ this.workingPath = workingPath;
+ }
+
+ public Collection<FetcherHistory> getFetchers() {
+ return fetchers.values();
+ }
+
+ public void setFetchers(Map<URI, FetcherHistory> fetchers) {
+ this.fetchers = fetchers;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public boolean hasFetcher() {
+ return fetchers != null && !fetchers.isEmpty();
+ }
+
+ public TableStats getInputStats() {
+ return inputStats;
+ }
+
+ public void setInputStats(TableStats inputStats) {
+ this.inputStats = inputStats;
+ }
+
+ public TableStats getOutputStats() {
+ return outputStats;
+ }
+
+ public void setOutputStats(TableStats outputStats) {
+ this.outputStats = outputStats;
+ }
+
+ public static String toInputStatsString(TableStats tableStats) {
+ if (tableStats == null) {
+ return "No input statistics";
+ }
+
+ String result = "";
+ result += "TotalBytes: " + FileUtil.humanReadableByteCount(tableStats.getNumBytes(), false) + " ("
+ + tableStats.getNumBytes() + " B)";
+ result += ", ReadBytes: " + FileUtil.humanReadableByteCount(tableStats.getReadBytes(), false) + " ("
+ + tableStats.getReadBytes() + " B)";
+ result += ", ReadRows: " + (tableStats.getNumRows() == 0 ? "-" : tableStats.getNumRows());
+
+ return result;
+ }
+
+ public static String toOutputStatsString(TableStats tableStats) {
+ if (tableStats == null) {
+ return "No output statistics";
+ }
+
+ return tableStats.toJson();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
new file mode 100644
index 0000000..9e904cd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -0,0 +1,431 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoProtos.TaskAttemptState;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryUnitRequestImpl;
+import org.apache.tajo.engine.utils.TupleCache;
+import org.apache.tajo.ipc.QueryMasterProtocol;
+import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.NullCallback;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.util.TajoIdUtils;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.apache.tajo.ipc.TajoWorkerProtocol.*;
+
+/**
+ * The driver class for Tajo QueryUnit processing.
+ */
+public class TaskRunner extends AbstractService {
+ /** class logger */
+ private static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
+ private TajoConf systemConf;
+
+ private volatile boolean stopped = false;
+
+ private ExecutionBlockId executionBlockId;
+ private QueryId queryId;
+ private NodeId nodeId;
+ private ContainerId containerId;
+
+ // Cluster Management
+ //private TajoWorkerProtocol.TajoWorkerProtocolService.Interface master;
+
+ // for temporal or intermediate files
+ private FileSystem localFS;
+ // for input files
+ private FileSystem defaultFS;
+
+ private TajoQueryEngine queryEngine;
+
+ // for Fetcher
+ private final ExecutorService fetchLauncher;
+ // It keeps all of the query unit attempts while a TaskRunner is running.
+ private final Map<QueryUnitAttemptId, Task> tasks = new ConcurrentHashMap<QueryUnitAttemptId, Task>();
+
+ private final Map<QueryUnitAttemptId, TaskHistory> taskHistories =
+ new ConcurrentHashMap<QueryUnitAttemptId, TaskHistory>();
+
+ private LocalDirAllocator lDirAllocator;
+
+ // A thread to receive each assigned query unit and execute the query unit
+ private Thread taskLauncher;
+
+ // Contains the object references related for TaskRunner
+ private TaskRunnerContext taskRunnerContext;
+ // for the doAs block
+ private UserGroupInformation taskOwner;
+
+ // for the local temporal dir
+ private String baseDir;
+ private Path baseDirPath;
+
+ private TaskRunnerManager taskRunnerManager;
+
+ private long finishTime;
+
+ private RpcConnectionPool connPool;
+
+ private InetSocketAddress qmMasterAddr;
+
+ public TaskRunner(TaskRunnerManager taskRunnerManager, TajoConf conf, String[] args) {
+ super(TaskRunner.class.getName());
+
+ this.taskRunnerManager = taskRunnerManager;
+ this.connPool = RpcConnectionPool.getPool(conf);
+ this.fetchLauncher = Executors.newFixedThreadPool(
+ conf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM));
+ try {
+ final ExecutionBlockId executionBlockId = TajoIdUtils.createExecutionBlockId(args[1]);
+
+ LOG.info("Tajo Root Dir: " + conf.getVar(ConfVars.ROOT_DIR));
+ LOG.info("Worker Local Dir: " + conf.getVar(ConfVars.WORKER_TEMPORAL_DIR));
+
+ UserGroupInformation.setConfiguration(conf);
+
+ // QueryBlockId from String
+ // NodeId has a form of hostname:port.
+ NodeId nodeId = ConverterUtils.toNodeId(args[2]);
+ this.containerId = ConverterUtils.toContainerId(args[3]);
+
+ // QueryMaster's address
+ String host = args[4];
+ int port = Integer.parseInt(args[5]);
+ this.qmMasterAddr = NetUtils.createSocketAddrForHost(host, port);
+
+ LOG.info("QueryMaster Address:" + qmMasterAddr);
+ // TODO - 'load credential' should be implemented
+ // Getting taskOwner
+ UserGroupInformation taskOwner = UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.USERNAME));
+ //taskOwner.addToken(token);
+
+ // initialize MasterWorkerProtocol as an actual task owner.
+// this.client =
+// taskOwner.doAs(new PrivilegedExceptionAction<AsyncRpcClient>() {
+// @Override
+// public AsyncRpcClient run() throws Exception {
+// return new AsyncRpcClient(TajoWorkerProtocol.class, masterAddr);
+// }
+// });
+// this.master = client.getStub();
+
+ this.executionBlockId = executionBlockId;
+ this.queryId = executionBlockId.getQueryId();
+ this.nodeId = nodeId;
+ this.taskOwner = taskOwner;
+
+ this.taskRunnerContext = new TaskRunnerContext();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public String getId() {
+ return executionBlockId + "," + containerId;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.systemConf = (TajoConf)conf;
+
+ try {
+ // initialize DFS and LocalFileSystems
+ defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(conf);
+ localFS = FileSystem.getLocal(conf);
+
+ // the base dir for an output dir
+ baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId();
+
+ // initialize LocalDirAllocator
+ lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+ baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
+ LOG.info("TaskRunner basedir is created (" + baseDir +")");
+
+ // Setup QueryEngine according to the query plan
+ // Here, we can setup row-based query engine or columnar query engine.
+ this.queryEngine = new TajoQueryEngine(systemConf);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ LOG.error(t);
+ }
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ run();
+ }
+
+ @Override
+ public void stop() {
+ if(isStopped()) {
+ return;
+ }
+ finishTime = System.currentTimeMillis();
+ // If this flag become true, taskLauncher will be terminated.
+ this.stopped = true;
+
+ // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
+ for (Task task : tasks.values()) {
+ if (task.getStatus() == TaskAttemptState.TA_PENDING ||
+ task.getStatus() == TaskAttemptState.TA_RUNNING) {
+ task.setState(TaskAttemptState.TA_FAILED);
+ }
+ }
+
+ tasks.clear();
+ fetchLauncher.shutdown();
+ this.queryEngine = null;
+
+ TupleCache.getInstance().removeBroadcastCache(executionBlockId);
+
+ LOG.info("Stop TaskRunner: " + executionBlockId);
+ synchronized (this) {
+ notifyAll();
+ }
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ public class TaskRunnerContext {
+ public TajoConf getConf() {
+ return systemConf;
+ }
+
+ public String getNodeId() {
+ return nodeId.toString();
+ }
+
+ public FileSystem getLocalFS() {
+ return localFS;
+ }
+
+ public FileSystem getDefaultFS() {
+ return defaultFS;
+ }
+
+ public LocalDirAllocator getLocalDirAllocator() {
+ return lDirAllocator;
+ }
+
+ public TajoQueryEngine getTQueryEngine() {
+ return queryEngine;
+ }
+
+ public Map<QueryUnitAttemptId, Task> getTasks() {
+ return tasks;
+ }
+
+ public Task getTask(QueryUnitAttemptId taskId) {
+ return tasks.get(taskId);
+ }
+
+ public ExecutorService getFetchLauncher() {
+ return fetchLauncher;
+ }
+
+ public Path getBaseDir() {
+ return baseDirPath;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return executionBlockId;
+ }
+
+ public void addTaskHistory(QueryUnitAttemptId quAttemptId, TaskHistory taskHistory) {
+ taskHistories.put(quAttemptId, taskHistory);
+ }
+
+ public TaskHistory getTaskHistory(QueryUnitAttemptId quAttemptId) {
+ return taskHistories.get(quAttemptId);
+ }
+
+ public Map<QueryUnitAttemptId, TaskHistory> getTaskHistories() {
+ return taskHistories;
+ }
+ }
+
+ public TaskRunnerContext getContext() {
+ return taskRunnerContext;
+ }
+
+ static void fatalError(QueryMasterProtocolService.Interface qmClientService,
+ QueryUnitAttemptId taskAttemptId, String message) {
+ if (message == null) {
+ message = "No error message";
+ }
+ TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
+ .setId(taskAttemptId.getProto())
+ .setErrorMessage(message);
+
+ qmClientService.fatalError(null, builder.build(), NullCallback.get());
+ }
+
+ public void run() {
+ LOG.info("TaskRunner startup");
+
+ try {
+
+ taskLauncher = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ int receivedNum = 0;
+ CallFuture<QueryUnitRequestProto> callFuture = null;
+ QueryUnitRequestProto taskRequest = null;
+
+ while(!stopped) {
+ NettyClientBase qmClient = null;
+ QueryMasterProtocolService.Interface qmClientService = null;
+ try {
+ qmClient = connPool.getConnection(qmMasterAddr, QueryMasterProtocol.class, true);
+ qmClientService = qmClient.getStub();
+
+ if (callFuture == null) {
+ callFuture = new CallFuture<QueryUnitRequestProto>();
+ LOG.info("Request GetTask: " + getId());
+ GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
+ .setExecutionBlockId(executionBlockId.getProto())
+ .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+ .build();
+
+ qmClientService.getTask(null, request, callFuture);
+ }
+ try {
+ // wait for an assigning task for 3 seconds
+ taskRequest = callFuture.get(3, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ if(stopped) {
+ break;
+ }
+ } catch (TimeoutException te) {
+ if(stopped) {
+ break;
+ }
+ // if there has been no assigning task for a given period,
+ // TaskRunner will retry to request an assigning task.
+ LOG.info("Retry assigning task:" + getId());
+ continue;
+ }
+
+ if (taskRequest != null) {
+ // QueryMaster can send the terminal signal to TaskRunner.
+ // If TaskRunner receives the terminal signal, TaskRunner will be terminated
+ // immediately.
+ if (taskRequest.getShouldDie()) {
+ LOG.info("Received ShouldDie flag:" + getId());
+ stop();
+ if(taskRunnerManager != null) {
+ //notify to TaskRunnerManager
+ taskRunnerManager.stopTask(getId());
+ }
+ } else {
+ taskRunnerManager.getWorkerContext().getWorkerSystemMetrics().counter("query", "task").inc();
+ LOG.info("Accumulated Received Task: " + (++receivedNum));
+
+ QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
+ if (tasks.containsKey(taskAttemptId)) {
+ LOG.error("Duplicate Task Attempt: " + taskAttemptId);
+ fatalError(qmClientService, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
+ continue;
+ }
+
+ LOG.info("Initializing: " + taskAttemptId);
+ Task task;
+ try {
+ task = new Task(taskAttemptId, taskRunnerContext, qmClientService,
+ new QueryUnitRequestImpl(taskRequest));
+ tasks.put(taskAttemptId, task);
+
+ task.init();
+ if (task.hasFetchPhase()) {
+ task.fetch(); // The fetch is performed in an asynchronous way.
+ }
+ // task.run() is a blocking call.
+ task.run();
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ fatalError(qmClientService, taskAttemptId, t.getMessage());
+ } finally {
+ callFuture = null;
+ taskRequest = null;
+ }
+ }
+ }
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ connPool.releaseConnection(qmClient);
+ }
+ }
+ }
+ });
+ taskLauncher.start();
+ } catch (Throwable t) {
+ LOG.fatal("Unhandled exception. Starting shutdown.", t);
+ } finally {
+ for (Task t : tasks.values()) {
+ if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
+ t.abort();
+ }
+ }
+ }
+ }
+
+ /**
+ * @return true if a stop has been requested.
+ */
+ public boolean isStopped() {
+ return this.stopped;
+ }
+
+ public ExecutionBlockId getExecutionBlockId() {
+ return this.executionBlockId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
new file mode 100644
index 0000000..da434e4
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.conf.TajoConf;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TaskRunnerManager extends CompositeService {
+ private static final Log LOG = LogFactory.getLog(TaskRunnerManager.class);
+
+ private final Map<String, TaskRunner> taskRunnerMap = new HashMap<String, TaskRunner>();
+ private final Map<String, TaskRunner> finishedTaskRunnerMap = new HashMap<String, TaskRunner>();
+ private TajoWorker.WorkerContext workerContext;
+ private TajoConf tajoConf;
+ private AtomicBoolean stop = new AtomicBoolean(false);
+ private FinishedTaskCleanThread finishedTaskCleanThread;
+
+ public TaskRunnerManager(TajoWorker.WorkerContext workerContext) {
+ super(TaskRunnerManager.class.getName());
+
+ this.workerContext = workerContext;
+ }
+
+ public TajoWorker.WorkerContext getWorkerContext() {
+ return workerContext;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ tajoConf = (TajoConf)conf;
+ super.init(tajoConf);
+ }
+
+ @Override
+ public void start() {
+ finishedTaskCleanThread = new FinishedTaskCleanThread();
+ finishedTaskCleanThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if(stop.get()) {
+ return;
+ }
+ stop.set(true);
+ synchronized(taskRunnerMap) {
+ for(TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+ if(!eachTaskRunner.isStopped()) {
+ eachTaskRunner.stop();
+ }
+ }
+ }
+
+ if(finishedTaskCleanThread != null) {
+ finishedTaskCleanThread.interrupted();
+ }
+ super.stop();
+ if(workerContext.isYarnContainerMode()) {
+ workerContext.stopWorker(true);
+ }
+ }
+
+ public void stopTask(String id) {
+ LOG.info("Stop Task:" + id);
+ synchronized(taskRunnerMap) {
+ TaskRunner taskRunner = taskRunnerMap.remove(id);
+ if(taskRunner != null) {
+ finishedTaskRunnerMap.put(id, taskRunner);
+ }
+ }
+ if(workerContext.isYarnContainerMode()) {
+ stop();
+ }
+ }
+
+ public Collection<TaskRunner> getTaskRunners() {
+ synchronized(taskRunnerMap) {
+ return Collections.unmodifiableCollection(taskRunnerMap.values());
+ }
+ }
+
+ public Collection<TaskRunner> getFinishedTaskRunners() {
+ synchronized(finishedTaskRunnerMap) {
+ return Collections.unmodifiableCollection(finishedTaskRunnerMap.values());
+ }
+ }
+
+ public TaskRunner findTaskRunner(String taskRunnerId) {
+ synchronized(taskRunnerMap) {
+ if(taskRunnerMap.containsKey(taskRunnerId)) {
+ return taskRunnerMap.get(taskRunnerId);
+ }
+ }
+ synchronized(finishedTaskRunnerMap) {
+ return finishedTaskRunnerMap.get(taskRunnerId);
+ }
+ }
+
+ public Task findTaskByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+ ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+ synchronized(taskRunnerMap) {
+ for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+ if (task != null) {
+ return task;
+ }
+ }
+ }
+ }
+ synchronized(finishedTaskRunnerMap) {
+ for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ Task task = eachTaskRunner.getContext().getTask(quAttemptId);
+ if (task != null) {
+ return task;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public TaskHistory findTaskHistoryByQueryUnitAttemptId(QueryUnitAttemptId quAttemptId) {
+ ExecutionBlockId ebid = quAttemptId.getQueryUnitId().getExecutionBlockId();
+ synchronized(taskRunnerMap) {
+ for (TaskRunner eachTaskRunner: taskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
+ if (taskHistory != null) {
+ return taskHistory;
+ }
+ }
+ }
+ }
+ synchronized(finishedTaskRunnerMap) {
+ for (TaskRunner eachTaskRunner: finishedTaskRunnerMap.values()) {
+ if (eachTaskRunner.getExecutionBlockId().equals(ebid)) {
+ TaskHistory taskHistory = eachTaskRunner.getContext().getTaskHistory(quAttemptId);
+ if (taskHistory != null) {
+ return taskHistory;
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ public int getNumTasks() {
+ synchronized(taskRunnerMap) {
+ return taskRunnerMap.size();
+ }
+ }
+
+ public void startTask(final String[] params) {
+ //TODO change to use event dispatcher
+ Thread t = new Thread() {
+ public void run() {
+ try {
+ TajoConf systemConf = new TajoConf(tajoConf);
+ TaskRunner taskRunner = new TaskRunner(TaskRunnerManager.this, systemConf, params);
+ LOG.info("Start TaskRunner:" + taskRunner.getId());
+ synchronized(taskRunnerMap) {
+ taskRunnerMap.put(taskRunner.getId(), taskRunner);
+ }
+ taskRunner.init(systemConf);
+ taskRunner.start();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ };
+
+ t.start();
+ }
+
+ class FinishedTaskCleanThread extends Thread {
+ public void run() {
+ int expireIntervalTime = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HISTORY_EXPIRE_PERIOD);
+ LOG.info("FinishedQueryMasterTaskCleanThread started: expire interval minutes = " + expireIntervalTime);
+ while(!stop.get()) {
+ try {
+ Thread.sleep(60 * 1000 * 60); // hourly check
+ } catch (InterruptedException e) {
+ break;
+ }
+ try {
+ long expireTime = System.currentTimeMillis() - expireIntervalTime * 60 * 1000;
+ cleanExpiredFinishedQueryMasterTask(expireTime);
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void cleanExpiredFinishedQueryMasterTask(long expireTime) {
+ synchronized(finishedTaskRunnerMap) {
+ List<String> expiredIds = new ArrayList<String>();
+ for(Map.Entry<String, TaskRunner> entry: finishedTaskRunnerMap.entrySet()) {
+ if(entry.getValue().getStartTime() > expireTime) {
+ expiredIds.add(entry.getKey());
+ }
+ }
+
+ for(String eachId: expiredIds) {
+ finishedTaskRunnerMap.remove(eachId);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
new file mode 100644
index 0000000..007bcbf
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.TajoMasterProtocol;
+import org.apache.tajo.ipc.TajoResourceTrackerProtocol;
+import org.apache.tajo.rpc.CallFuture;
+import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcConnectionPool;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.storage.v2.DiskDeviceInfo;
+import org.apache.tajo.storage.v2.DiskMountInfo;
+import org.apache.tajo.storage.v2.DiskUtil;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat;
+
+/**
+ * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc.
+ */
+public class WorkerHeartbeatService extends AbstractService {
+ /** class logger */
+ private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class);
+
+ private final TajoWorker.WorkerContext context;
+ private TajoConf systemConf;
+ private RpcConnectionPool connectionPool;
+ private WorkerHeartbeatThread thread;
+ private static final float HDFS_DATANODE_STORAGE_SIZE;
+
+ static {
+ HDFS_DATANODE_STORAGE_SIZE = DiskUtil.getDataNodeStorageSize();
+ }
+
+ public WorkerHeartbeatService(TajoWorker.WorkerContext context) {
+ super(WorkerHeartbeatService.class.getSimpleName());
+ this.context = context;
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) {
+ Preconditions.checkArgument(conf instanceof TajoConf, "Configuration must be a TajoConf instance.");
+ this.systemConf = (TajoConf) conf;
+
+ connectionPool = RpcConnectionPool.getPool(systemConf);
+ thread = new WorkerHeartbeatThread();
+ thread.start();
+ super.init(conf);
+ }
+
+ @Override
+ public void serviceStop() {
+ thread.stopped.set(true);
+ synchronized (thread) {
+ thread.notifyAll();
+ }
+ super.stop();
+ }
+
+ class WorkerHeartbeatThread extends Thread {
+ private volatile AtomicBoolean stopped = new AtomicBoolean(false);
+ TajoMasterProtocol.ServerStatusProto.System systemInfo;
+ List<TajoMasterProtocol.ServerStatusProto.Disk> diskInfos =
+ new ArrayList<TajoMasterProtocol.ServerStatusProto.Disk>();
+ float workerDiskSlots;
+ int workerMemoryMB;
+ List<DiskDeviceInfo> diskDeviceInfos;
+
+ public WorkerHeartbeatThread() {
+ int workerCpuCoreNum;
+
+ boolean dedicatedResource = systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED);
+ int workerCpuCoreSlots = Runtime.getRuntime().availableProcessors();
+
+ try {
+ diskDeviceInfos = DiskUtil.getDiskDeviceInfos();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+
+ if(dedicatedResource) {
+ float dedicatedMemoryRatio = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_DEDICATED_MEMORY_RATIO);
+ int totalMemory = getTotalMemoryMB();
+ workerMemoryMB = (int) ((float) (totalMemory) * dedicatedMemoryRatio);
+ workerCpuCoreNum = Runtime.getRuntime().availableProcessors();
+
+ if(diskDeviceInfos == null) {
+ workerDiskSlots = TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS.defaultIntVal;
+ } else {
+ workerDiskSlots = diskDeviceInfos.size();
+ }
+ } else {
+ workerMemoryMB = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB);
+ workerCpuCoreNum = systemConf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES);
+ workerDiskSlots = systemConf.getFloatVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS);
+
+ if (systemConf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && HDFS_DATANODE_STORAGE_SIZE > 0) {
+ workerDiskSlots = HDFS_DATANODE_STORAGE_SIZE;
+ }
+ }
+
+ systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder()
+ .setAvailableProcessors(workerCpuCoreNum)
+ .setFreeMemoryMB(0)
+ .setMaxMemoryMB(0)
+ .setTotalMemoryMB(getTotalMemoryMB())
+ .build();
+ }
+
+ public void run() {
+ LOG.info("Worker Resource Heartbeat Thread start.");
+ int sendDiskInfoCount = 0;
+ int pullServerPort = 0;
+ if(context.getPullService()!= null) {
+ long startTime = System.currentTimeMillis();
+ while(true) {
+ pullServerPort = context.getPullService().getPort();
+ if(pullServerPort > 0) {
+ break;
+ }
+ //waiting while pull server init
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ if(System.currentTimeMillis() - startTime > 30 * 1000) {
+ LOG.fatal("Too long push server init.");
+ System.exit(0);
+ }
+ }
+ }
+
+ String hostName = null;
+ int peerRpcPort = 0;
+ int queryMasterPort = 0;
+ int clientPort = 0;
+
+ if(context.getTajoWorkerManagerService() != null) {
+ hostName = context.getTajoWorkerManagerService().getBindAddr().getHostName();
+ peerRpcPort = context.getTajoWorkerManagerService().getBindAddr().getPort();
+ }
+ if(context.getQueryMasterManagerService() != null) {
+ hostName = context.getQueryMasterManagerService().getBindAddr().getHostName();
+ queryMasterPort = context.getQueryMasterManagerService().getBindAddr().getPort();
+ }
+ if(context.getTajoWorkerClientService() != null) {
+ clientPort = context.getTajoWorkerClientService().getBindAddr().getPort();
+ }
+ if (context.getPullService() != null) {
+ pullServerPort = context.getPullService().getPort();
+ }
+
+ while(!stopped.get()) {
+ if(sendDiskInfoCount == 0 && diskDeviceInfos != null) {
+ getDiskUsageInfos();
+ }
+ TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap =
+ TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder()
+ .setMaxHeap(Runtime.getRuntime().maxMemory())
+ .setFreeHeap(Runtime.getRuntime().freeMemory())
+ .setTotalHeap(Runtime.getRuntime().totalMemory())
+ .build();
+
+ TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder()
+ .addAllDisk(diskInfos)
+ .setRunningTaskNum(
+ context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks())
+ .setSystem(systemInfo)
+ .setDiskSlots(workerDiskSlots)
+ .setMemoryResourceMB(workerMemoryMB)
+ .setJvmHeap(jvmHeap)
+ .setQueryMasterMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isQueryMasterMode()))
+ .setTaskRunnerMode(PrimitiveProtos.BoolProto.newBuilder().setValue(context.isTaskRunnerMode()))
+ .build();
+
+ NodeHeartbeat heartbeatProto = NodeHeartbeat.newBuilder()
+ .setTajoWorkerHost(hostName)
+ .setTajoQueryMasterPort(queryMasterPort)
+ .setPeerRpcPort(peerRpcPort)
+ .setTajoWorkerClientPort(clientPort)
+ .setTajoWorkerHttpPort(context.getHttpPort())
+ .setTajoWorkerPullServerPort(pullServerPort)
+ .setServerStatus(serverStatus)
+ .build();
+
+ NettyClientBase rmClient = null;
+ try {
+ CallFuture<TajoMasterProtocol.TajoHeartbeatResponse> callBack =
+ new CallFuture<TajoMasterProtocol.TajoHeartbeatResponse>();
+
+ rmClient = connectionPool.getConnection(context.getResourceTrackerAddress(), TajoResourceTrackerProtocol.class, true);
+ TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub();
+ resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack);
+
+ TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS);
+ if(response != null) {
+ TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary();
+ if(clusterResourceSummary.getNumWorkers() > 0) {
+ context.setNumClusterNodes(clusterResourceSummary.getNumWorkers());
+ }
+ context.setClusterResource(clusterResourceSummary);
+ } else {
+ if(callBack.getController().failed()) {
+ throw new ServiceException(callBack.getController().errorText());
+ }
+ }
+ } catch (InterruptedException e) {
+ break;
+ } catch (TimeoutException te) {
+ LOG.warn("Heartbeat response is being delayed.");
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ connectionPool.releaseConnection(rmClient);
+ }
+
+ try {
+ synchronized (WorkerHeartbeatThread.this){
+ wait(10 * 1000);
+ }
+ } catch (InterruptedException e) {
+ break;
+ }
+ sendDiskInfoCount++;
+
+ if(sendDiskInfoCount > 10) {
+ sendDiskInfoCount = 0;
+ }
+ }
+
+ LOG.info("Worker Resource Heartbeat Thread stopped.");
+ }
+
+ private void getDiskUsageInfos() {
+ diskInfos.clear();
+ for(DiskDeviceInfo eachDevice: diskDeviceInfos) {
+ List<DiskMountInfo> mountInfos = eachDevice.getMountInfos();
+ if(mountInfos != null) {
+ for(DiskMountInfo eachMount: mountInfos) {
+ File eachFile = new File(eachMount.getMountPath());
+ diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder()
+ .setAbsolutePath(eachFile.getAbsolutePath())
+ .setTotalSpace(eachFile.getTotalSpace())
+ .setFreeSpace(eachFile.getFreeSpace())
+ .setUsableSpace(eachFile.getUsableSpace())
+ .build());
+ }
+ }
+ }
+ }
+ }
+
+ public static int getTotalMemoryMB() {
+ com.sun.management.OperatingSystemMXBean bean =
+ (com.sun.management.OperatingSystemMXBean)
+ java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+ long max = bean.getTotalPhysicalMemorySize();
+ return ((int) (max / (1024 * 1024)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
new file mode 100644
index 0000000..1771255
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.master.TaskRunnerGroupEvent;
+import org.apache.tajo.master.TaskRunnerLauncher;
+import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
+import org.apache.tajo.master.event.ContainerAllocatorEventType;
+import org.apache.tajo.master.querymaster.QueryMasterTask;
+import org.apache.tajo.master.rm.YarnRMContainerAllocator;
+
+public class YarnResourceAllocator extends AbstractResourceAllocator {
+ private YarnRMContainerAllocator rmAllocator;
+
+ private TaskRunnerLauncher taskRunnerLauncher;
+
+ private YarnRPC yarnRPC;
+
+ private YarnClient yarnClient;
+
+ private static final Log LOG = LogFactory.getLog(YarnResourceAllocator.class.getName());
+
+ private QueryMasterTask.QueryMasterTaskContext queryTaskContext;
+
+ private TajoConf systemConf;
+
+ public YarnResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
+ this.queryTaskContext = queryTaskContext;
+ }
+
+ @Override
+ public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId) {
+ return new ContainerIdPBImpl(containerId);
+ }
+
+ @Override
+ public void allocateTaskWorker() {
+ }
+
+ @Override
+ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
+ int numTasks,
+ int memoryMBPerTask) {
+ int numClusterNodes = workerContext.getNumClusterNodes();
+
+ TajoConf conf = (TajoConf)workerContext.getQueryMaster().getConfig();
+ int workerNum = conf.getIntVar(TajoConf.ConfVars.YARN_RM_WORKER_NUMBER_PER_NODE);
+ return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * workerNum);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ systemConf = (TajoConf)conf;
+
+ yarnRPC = YarnRPC.create(systemConf);
+
+ connectYarnClient();
+
+ taskRunnerLauncher = new YarnTaskRunnerLauncherImpl(queryTaskContext, yarnRPC);
+ addService((Service) taskRunnerLauncher);
+ queryTaskContext.getDispatcher().register(TaskRunnerGroupEvent.EventType.class, taskRunnerLauncher);
+
+ rmAllocator = new YarnRMContainerAllocator(queryTaskContext);
+ addService(rmAllocator);
+ queryTaskContext.getDispatcher().register(ContainerAllocatorEventType.class, rmAllocator);
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ try {
+ this.yarnClient.stop();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ super.stop();
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ }
+
+ private void connectYarnClient() {
+ this.yarnClient = new YarnClientImpl();
+ this.yarnClient.init(systemConf);
+ this.yarnClient.start();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
new file mode 100644
index 0000000..6c93e4f
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/FileAccessForbiddenException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import java.io.IOException;
+
+public class FileAccessForbiddenException extends IOException {
+ private static final long serialVersionUID = -3383272565826389213L;
+
+ public FileAccessForbiddenException() {
+ }
+
+ public FileAccessForbiddenException(String message) {
+ super(message);
+ }
+
+ public FileAccessForbiddenException(Throwable cause) {
+ super(cause);
+ }
+
+ public FileAccessForbiddenException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
new file mode 100644
index 0000000..523d6e1
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServer.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.net.NetUtils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.ChannelGroupFuture;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+
+public class HttpDataServer {
+ private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
+
+ private final InetSocketAddress addr;
+ private InetSocketAddress bindAddr;
+ private ServerBootstrap bootstrap = null;
+ private ChannelFactory factory = null;
+ private ChannelGroup channelGroup = null;
+
+ public HttpDataServer(final InetSocketAddress addr,
+ final DataRetriever retriever) {
+ this.addr = addr;
+ this.factory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
+ Runtime.getRuntime().availableProcessors() * 2);
+
+ // Configure the server.
+ this.bootstrap = new ServerBootstrap(factory);
+ // Set up the event pipeline factory.
+ this.bootstrap.setPipelineFactory(
+ new HttpDataServerPipelineFactory(retriever));
+ this.channelGroup = new DefaultChannelGroup();
+ }
+
+ public HttpDataServer(String bindaddr, DataRetriever retriever) {
+ this(NetUtils.createSocketAddr(bindaddr), retriever);
+ }
+
+ public void start() {
+ // Bind and start to accept incoming connections.
+ Channel channel = bootstrap.bind(addr);
+ channelGroup.add(channel);
+ this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
+ LOG.info("HttpDataServer starts up ("
+ + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
+ + ")");
+ }
+
+ public InetSocketAddress getBindAddress() {
+ return this.bindAddr;
+ }
+
+ public void stop() {
+ ChannelGroupFuture future = channelGroup.close();
+ future.awaitUninterruptibly();
+ factory.releaseExternalResources();
+
+ LOG.info("HttpDataServer shutdown ("
+ + this.bindAddr.getAddress().getHostAddress() + ":"
+ + this.bindAddr.getPort() + ")");
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
new file mode 100644
index 0000000..6b9eea8
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerHandler.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.*;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.HttpResponse;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.jboss.netty.handler.stream.ChunkedFile;
+import org.jboss.netty.util.CharsetUtil;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+import org.apache.tajo.worker.dataserver.retriever.FileChunk;
+
+import java.io.*;
+import java.net.URLDecoder;
+
+import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
+import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
+import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
+ private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
+ private final DataRetriever retriever;
+
+ public HttpDataServerHandler(DataRetriever retriever) {
+ this.retriever = retriever;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ HttpRequest request = (HttpRequest) e.getMessage();
+ if (request.getMethod() != GET) {
+ sendError(ctx, METHOD_NOT_ALLOWED);
+ return;
+ }
+
+ FileChunk [] file;
+ try {
+ file = retriever.handle(ctx, request);
+ } catch (FileNotFoundException fnf) {
+ LOG.error(fnf);
+ sendError(ctx, NOT_FOUND);
+ return;
+ } catch (IllegalArgumentException iae) {
+ LOG.error(iae);
+ sendError(ctx, BAD_REQUEST);
+ return;
+ } catch (FileAccessForbiddenException fafe) {
+ LOG.error(fafe);
+ sendError(ctx, FORBIDDEN);
+ return;
+ } catch (IOException ioe) {
+ LOG.error(ioe);
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ // Write the content.
+ Channel ch = e.getChannel();
+ if (file == null) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
+ ch.write(response);
+ if (!isKeepAlive(request)) {
+ ch.close();
+ }
+ } else {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ long totalSize = 0;
+ for (FileChunk chunk : file) {
+ totalSize += chunk.length();
+ }
+ setContentLength(response, totalSize);
+
+ // Write the initial line and the header.
+ ch.write(response);
+
+ ChannelFuture writeFuture = null;
+
+ for (FileChunk chunk : file) {
+ writeFuture = sendFile(ctx, ch, chunk);
+ if (writeFuture == null) {
+ sendError(ctx, NOT_FOUND);
+ return;
+ }
+ }
+
+ // Decide whether to close the connection or not.
+ if (!isKeepAlive(request)) {
+ // Close the connection when the whole content is written out.
+ writeFuture.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+ }
+
+ private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
+ RandomAccessFile raf;
+ try {
+ raf = new RandomAccessFile(file.getFile(), "r");
+ } catch (FileNotFoundException fnfe) {
+ return null;
+ }
+
+ ChannelFuture writeFuture;
+ if (ch.getPipeline().get(SslHandler.class) != null) {
+ // Cannot use zero-copy with HTTPS.
+ writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
+ } else {
+ // No encryption - use zero-copy.
+ final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
+ writeFuture = ch.write(region);
+ writeFuture.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) {
+ region.releaseExternalResources();
+ }
+ });
+ }
+
+ return writeFuture;
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ throws Exception {
+ Channel ch = e.getChannel();
+ Throwable cause = e.getCause();
+ if (cause instanceof TooLongFrameException) {
+ sendError(ctx, BAD_REQUEST);
+ return;
+ }
+
+ cause.printStackTrace();
+ if (ch.isConnected()) {
+ sendError(ctx, INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ public static String sanitizeUri(String uri) {
+ // Decode the path.
+ try {
+ uri = URLDecoder.decode(uri, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ try {
+ uri = URLDecoder.decode(uri, "ISO-8859-1");
+ } catch (UnsupportedEncodingException e1) {
+ throw new Error();
+ }
+ }
+
+ // Convert file separators.
+ uri = uri.replace('/', File.separatorChar);
+
+ // Simplistic dumb security check.
+ // You will have to do something serious in the production environment.
+ if (uri.contains(File.separator + ".")
+ || uri.contains("." + File.separator) || uri.startsWith(".")
+ || uri.endsWith(".")) {
+ return null;
+ }
+
+ // Convert to absolute path.
+ return uri;
+ }
+
+ private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
+ HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+ response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ response.setContent(ChannelBuffers.copiedBuffer(
+ "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
+
+ // Close the connection as soon as the error message is sent.
+ ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
new file mode 100644
index 0000000..0a47f6b
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
+import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
+import org.jboss.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
+ private final DataRetriever ret;
+
+ public HttpDataServerPipelineFactory(DataRetriever ret) {
+ this.ret = ret;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ // Create a default pipeline implementation.
+ ChannelPipeline pipeline = pipeline();
+
+ // Uncomment the following line if you want HTTPS
+ // SSLEngine engine =
+ // SecureChatSslContextFactory.getServerContext().createSSLEngine();
+ // engine.setUseClientMode(false);
+ // pipeline.addLast("ssl", new SslHandler(engine));
+
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
+ //pipeline.addLast("deflater", new HttpContentCompressor());
+ pipeline.addLast("handler", new HttpDataServerHandler(ret));
+ return pipeline;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
new file mode 100644
index 0000000..e688c39
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/HttpUtil.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver;
+
+import com.google.common.collect.Maps;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.util.Map;
+
+public class HttpUtil {
+ public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
+ return getParamsFromQuery(uri.getQuery());
+ }
+
+ /**
+ * It parses a query string into key/value pairs
+ *
+ * @param queryString decoded query string
+ * @return key/value pairs parsed from a given query string
+ * @throws UnsupportedEncodingException
+ */
+ public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
+ String [] queries = queryString.split("&");
+
+ Map<String,String> params = Maps.newHashMap();
+ String [] param;
+ for (String q : queries) {
+ param = q.split("=");
+ params.put(param[0], param[1]);
+ }
+
+ return params;
+ }
+
+ public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
+ StringBuilder sb = new StringBuilder();
+
+ boolean first = true;
+ for (Map.Entry<String,String> param : params.entrySet()) {
+ if (!first) {
+ sb.append("&");
+ }
+ sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
+ append("=").
+ append(URLEncoder.encode(param.getValue(), "UTF-8"));
+ first = false;
+ }
+
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
new file mode 100644
index 0000000..2ef0c4c
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.util.TajoIdUtils;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.jboss.netty.handler.codec.http.QueryStringDecoder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class AdvancedDataRetriever implements DataRetriever {
+ private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
+ private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
+
+ public AdvancedDataRetriever() {
+ }
+
+ public void register(QueryUnitAttemptId id, RetrieverHandler handler) {
+ synchronized (handlerMap) {
+ if (!handlerMap.containsKey(id.toString())) {
+ handlerMap.put(id.toString(), handler);
+ }
+ }
+ }
+
+ public void unregister(QueryUnitAttemptId id) {
+ synchronized (handlerMap) {
+ if (handlerMap.containsKey(id.toString())) {
+ handlerMap.remove(id.toString());
+ }
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
+ */
+ @Override
+ public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException {
+
+ final Map<String, List<String>> params =
+ new QueryStringDecoder(request.getUri()).getParameters();
+
+ if (!params.containsKey("qid")) {
+ throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
+ }
+
+ if (params.containsKey("sid")) {
+ List<FileChunk> chunks = Lists.newArrayList();
+ List<String> qids = splitMaps(params.get("qid"));
+ for (String qid : qids) {
+ String[] ids = qid.split("_");
+ ExecutionBlockId suid = TajoIdUtils.createExecutionBlockId(params.get("sid").get(0));
+ QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
+ QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
+ Integer.parseInt(ids[1]));
+ RetrieverHandler handler = handlerMap.get(attemptId.toString());
+ FileChunk chunk = handler.get(params);
+ chunks.add(chunk);
+ }
+ return chunks.toArray(new FileChunk[chunks.size()]);
+ } else {
+ RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
+ FileChunk chunk = handler.get(params);
+ if (chunk == null) {
+ if (params.containsKey("qid")) { // if there is no content corresponding to the query
+ return null;
+ } else { // if there is no
+ throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
+ }
+ }
+
+ File file = chunk.getFile();
+ if (file.isHidden() || !file.exists()) {
+ throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
+ }
+ if (!file.isFile()) {
+ throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
+ }
+
+ return new FileChunk[] {chunk};
+ }
+ }
+
+ private List<String> splitMaps(List<String> qids) {
+ if (null == qids) {
+ LOG.error("QueryUnitId is EMPTY");
+ return null;
+ }
+
+ final List<String> ret = new ArrayList<String>();
+ for (String qid : qids) {
+ Collections.addAll(ret, qid.split(","));
+ }
+ return ret;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
new file mode 100644
index 0000000..b26ba74
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DataRetriever.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+
+import java.io.IOException;
+
+public interface DataRetriever {
+ FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
new file mode 100644
index 0000000..62dabbd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/DirectoryRetriever.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.http.HttpRequest;
+import org.apache.tajo.worker.dataserver.FileAccessForbiddenException;
+import org.apache.tajo.worker.dataserver.HttpDataServerHandler;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+public class DirectoryRetriever implements DataRetriever {
+ public String baseDir;
+
+ public DirectoryRetriever(String baseDir) {
+ this.baseDir = baseDir;
+ }
+
+ @Override
+ public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
+ throws IOException {
+ final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
+ if (path == null) {
+ throw new IllegalArgumentException("Wrong path: " +path);
+ }
+
+ File file = new File(baseDir, path);
+ if (file.isHidden() || !file.exists()) {
+ throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
+ }
+ if (!file.isFile()) {
+ throw new FileAccessForbiddenException("No such file: "
+ + baseDir + "/" + path);
+ }
+
+ return new FileChunk[] {new FileChunk(file, 0, file.length())};
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
new file mode 100644
index 0000000..4f11168
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/FileChunk.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+public class FileChunk {
+ private final File file;
+ private final long startOffset;
+ private final long length;
+
+ public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
+ this.file = file;
+ this.startOffset = startOffset;
+ this.length = length;
+ }
+
+ public File getFile() {
+ return this.file;
+ }
+
+ public long startOffset() {
+ return this.startOffset;
+ }
+
+ public long length() {
+ return this.length;
+ }
+
+ public String toString() {
+ return " (start=" + startOffset() + ", length=" + length + ") "
+ + file.getAbsolutePath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
new file mode 100644
index 0000000..e5479cc
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/dataserver/retriever/RetrieverHandler.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.worker.dataserver.retriever;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public interface RetrieverHandler {
+ /**
+ *
+ * @param kvs url-decoded key/value pairs
+ * @return a desired part of a file
+ * @throws IOException
+ */
+ public FileChunk get(Map<String, List<String>> kvs) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/InternalTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/InternalTypes.proto b/tajo-core/src/main/proto/InternalTypes.proto
new file mode 100644
index 0000000..1a62bc2
--- /dev/null
+++ b/tajo-core/src/main/proto/InternalTypes.proto
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo";
+option java_outer_classname = "InternalTypes";
+option java_generic_services = false;
+option java_generate_equals_and_hash = true;
+
+message AvgLongProto {
+ required int64 sum = 1;
+ required int64 count = 2;
+}
+
+message AvgDoubleProto {
+ required double sum = 1;
+ required int64 count = 2;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
new file mode 100644
index 0000000..e12c9aa
--- /dev/null
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "QueryMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+import "TajoWorkerProtocol.proto";
+
+service QueryMasterProtocolService {
+ //from Worker
+ rpc getTask(GetTaskRequestProto) returns (QueryUnitRequestProto);
+ rpc statusUpdate (TaskStatusProto) returns (BoolProto);
+ rpc ping (QueryUnitAttemptIdProto) returns (BoolProto);
+ rpc fatalError(TaskFatalErrorReport) returns (BoolProto);
+ rpc done (TaskCompletionReport) returns (BoolProto);
+
+ //from TajoMaster's QueryJobManager
+ rpc killQuery(QueryIdProto) returns (BoolProto);
+ rpc executeQuery(QueryExecutionRequestProto) returns (BoolProto);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
new file mode 100644
index 0000000..d46d09a
--- /dev/null
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoResourceTrackerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "TajoMasterProtocol.proto";
+
+message NodeHeartbeat {
+ required string tajoWorkerHost = 1;
+ required int32 peerRpcPort = 2;
+ required int32 tajoQueryMasterPort = 3;
+ optional ServerStatusProto serverStatus = 4;
+ optional int32 tajoWorkerClientPort = 5;
+ optional string statusMessage = 6;
+ optional int32 tajoWorkerPullServerPort = 7;
+ optional int32 tajoWorkerHttpPort = 8;
+}
+
+service TajoResourceTrackerProtocolService {
+ rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
new file mode 100644
index 0000000..8fccbaf
--- /dev/null
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//TajoWorker -> TajoMaster protocol
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "TajoMasterProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+import "yarn_protos.proto";
+import "tajo_protos.proto";
+import "TajoIdProtos.proto";
+import "CatalogProtos.proto";
+import "PrimitiveProtos.proto";
+
+message ServerStatusProto {
+ message System {
+ required int32 availableProcessors = 1;
+ required int32 freeMemoryMB = 2;
+ required int32 maxMemoryMB = 3;
+ required int32 totalMemoryMB = 4;
+ }
+ message Disk {
+ required string absolutePath = 1;
+ required int64 totalSpace = 2;
+ required int64 freeSpace = 3;
+ required int64 usableSpace = 4;
+ }
+
+ message JvmHeap {
+ required int64 maxHeap = 1;
+ required int64 totalHeap = 2;
+ required int64 freeHeap = 3;
+ }
+
+ required System system = 1;
+ required float diskSlots = 2;
+ required int32 memoryResourceMB = 3;
+ repeated Disk disk = 4;
+ required int32 runningTaskNum = 5;
+ required JvmHeap jvmHeap = 6;
+ required BoolProto queryMasterMode = 7;
+ required BoolProto taskRunnerMode = 8;
+}
+
+message TajoHeartbeat {
+ required string tajoWorkerHost = 1;
+ required int32 tajoQueryMasterPort = 2;
+ optional int32 tajoWorkerClientPort = 3;
+ optional QueryIdProto queryId = 4;
+ optional QueryState state = 5;
+ optional string statusMessage = 6;
+ optional float queryProgress = 7;
+ optional int64 queryFinishTime = 8;
+}
+
+message TajoHeartbeatResponse {
+ message ResponseCommand {
+ required string command = 1;
+ repeated string params = 2;
+ }
+ required BoolProto heartbeatResult = 1;
+ required ClusterResourceSummary clusterResourceSummary = 2;
+ optional ResponseCommand responseCommand = 3;
+}
+
+message ClusterResourceSummary {
+ required int32 numWorkers = 1;
+ required int32 totalDiskSlots = 2;
+ required int32 totalCpuCoreSlots = 3;
+ required int32 totalMemoryMB = 4;
+
+ required int32 totalAvailableDiskSlots = 5;
+ required int32 totalAvailableCpuCoreSlots = 6;
+ required int32 totalAvailableMemoryMB = 7;
+}
+
+enum ResourceRequestPriority {
+ MEMORY = 1;
+ DISK = 2;
+}
+
+message WorkerResourceAllocationRequest {
+ required QueryIdProto queryId = 1;
+ required ResourceRequestPriority resourceRequestPriority = 2;
+
+ required int32 numContainers = 3;
+
+ required int32 maxMemoryMBPerContainer = 4;
+ required int32 minMemoryMBPerContainer = 5;
+
+ required float maxDiskSlotPerContainer = 6;
+ required float minDiskSlotPerContainer = 7;
+}
+
+message WorkerResourceProto {
+ required string host = 1;
+ required int32 peerRpcPort = 2;
+ required int32 queryMasterPort = 3;
+ required int32 infoPort = 4;
+ required int32 memoryMB = 5 ;
+ required float diskSlots = 6;
+}
+
+message WorkerResourcesRequest {
+ repeated WorkerResourceProto workerResources = 1;
+}
+
+message WorkerResourceReleaseRequest {
+ required ExecutionBlockIdProto executionBlockId = 1;
+ repeated hadoop.yarn.ContainerIdProto containerIds = 2;
+}
+
+message WorkerAllocatedResource {
+ required hadoop.yarn.ContainerIdProto containerId = 1;
+ required string nodeId = 2;
+ required string workerHost = 3;
+ required int32 peerRpcPort = 4;
+ required int32 queryMasterPort = 5;
+ required int32 clientPort = 6;
+ required int32 workerPullServerPort = 7;
+
+ required int32 allocatedMemoryMB = 8;
+ required float allocatedDiskSlots = 9;
+}
+
+message WorkerResourceAllocationResponse {
+ required QueryIdProto queryId = 1;
+ repeated WorkerAllocatedResource workerAllocatedResource = 2;
+}
+
+service TajoMasterProtocolService {
+ rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse);
+ rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse);
+ rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto);
+ rpc stopQueryMaster(QueryIdProto) returns (BoolProto);
+ rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest);
+}
\ No newline at end of file