You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/07/02 16:16:00 UTC

[06/51] [partial] TAJO-22: The package prefix should be org.apache.tajo. (DaeMyung Kang via hyunsik)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
deleted file mode 100644
index 0fd534e..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/TaskRunner.java
+++ /dev/null
@@ -1,394 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.service.AbstractService;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import tajo.QueryConf;
-import tajo.QueryUnitAttemptId;
-import tajo.SubQueryId;
-import tajo.TajoProtos.TaskAttemptState;
-import tajo.conf.TajoConf.ConfVars;
-import tajo.engine.MasterWorkerProtos.QueryUnitRequestProto;
-import tajo.engine.query.QueryUnitRequestImpl;
-import tajo.ipc.MasterWorkerProtocol;
-import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService;
-import tajo.ipc.MasterWorkerProtocol.MasterWorkerProtocolService.Interface;
-import tajo.rpc.CallFuture2;
-import tajo.rpc.NullCallback;
-import tajo.rpc.ProtoAsyncRpcClient;
-import tajo.util.TajoIdUtils;
-
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.security.PrivilegedExceptionAction;
-import java.util.Map;
-import java.util.concurrent.*;
-
-import static tajo.engine.MasterWorkerProtos.TaskFatalErrorReport;
-
-/**
- * The driver class for Tajo QueryUnit processing.
- */
-public class TaskRunner extends AbstractService {
-  /** class logger */
-  private static final Log LOG = LogFactory.getLog(TaskRunner.class);
-
-  private QueryConf conf;
-
-  private volatile boolean stopped = false;
-
-  private final SubQueryId subQueryId;
-  private ApplicationId appId;
-  private final NodeId nodeId;
-  private final ContainerId containerId;
-
-  // Cluster Management
-  private MasterWorkerProtocolService.Interface master;
-
-  // for temporal or intermediate files
-  private FileSystem localFS;
-  // for input files
-  private FileSystem defaultFS;
-
-  private TajoQueryEngine queryEngine;
-
-  // TODO - this should be configurable
-  private final int coreNum = 4;
-
-  // for Fetcher
-  private final ExecutorService fetchLauncher =
-      Executors.newFixedThreadPool(coreNum * 4);
-  // It keeps all of the query unit attempts while a TaskRunner is running.
-  private final Map<QueryUnitAttemptId, Task> tasks =
-      new ConcurrentHashMap<QueryUnitAttemptId, Task>();
-  private LocalDirAllocator lDirAllocator;
-
-  // A thread to receive each assigned query unit and execute the query unit
-  private Thread taskLauncher;
-
-  // Contains the object references related for TaskRunner
-  private WorkerContext workerContext;
-  // for the doAs block
-  private UserGroupInformation taskOwner;
-
-  // for the local temporal dir
-  private String baseDir;
-  private Path baseDirPath;
-
-  public TaskRunner(
-      final SubQueryId subQueryId,
-      final NodeId nodeId,
-      UserGroupInformation taskOwner,
-      Interface master, ContainerId containerId) {
-    super(TaskRunner.class.getName());
-    this.subQueryId = subQueryId;
-    this.appId = subQueryId.getQueryId().getApplicationId();
-    this.nodeId = nodeId;
-    this.taskOwner = taskOwner;
-    this.master = master;
-    this.containerId = containerId;
-  }
-
-  @Override
-  public void init(Configuration _conf) {
-    this.conf = (QueryConf) _conf;
-
-    try {
-      this.workerContext = new WorkerContext();
-
-      // initialize DFS and LocalFileSystems
-      defaultFS = FileSystem.get(URI.create(conf.get("tajo.rootdir")),conf);
-      localFS = FileSystem.getLocal(conf);
-
-      // the base dir for an output dir
-      baseDir = ConverterUtils.toString(appId)
-          + "/output" + "/" + subQueryId.getId();
-
-      // initialize LocalDirAllocator
-      lDirAllocator = new LocalDirAllocator(ConfVars.TASK_LOCAL_DIR.varname);
-
-      baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(baseDir, conf));
-      LOG.info("TaskRunner basedir is created (" + baseDir +")");
-
-      // Setup QueryEngine according to the query plan
-      // Here, we can setup row-based query engine or columnar query engine.
-      this.queryEngine = new TajoQueryEngine(conf);
-
-      Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook()));
-    } catch (Throwable t) {
-      LOG.error(t);
-    }
-
-    super.init(conf);
-  }
-
-  @Override
-  public void start() {
-    run();
-  }
-
-  @Override
-  public void stop() {
-    if (!isStopped()) {
-      // If TaskRunner is stopped, all running or pending tasks will be marked as failed.
-      for (Task task : tasks.values()) {
-        if (task.getStatus() == TaskAttemptState.TA_PENDING ||
-            task.getStatus() == TaskAttemptState.TA_RUNNING) {
-          task.setState(TaskAttemptState.TA_FAILED);
-        }
-      }
-
-      // If this flag become true, taskLauncher will be terminated.
-      this.stopped = true;
-
-      LOG.info("STOPPED: " + nodeId);
-      synchronized (this) {
-        notifyAll();
-      }
-    }
-  }
-
-  class WorkerContext {
-    public QueryConf getConf() {
-      return conf;
-    }
-
-    public String getNodeId() {
-      return nodeId.toString();
-    }
-
-    public MasterWorkerProtocolService.Interface getMaster() {
-      return master;
-    }
-
-    public FileSystem getLocalFS() {
-      return localFS;
-    }
-
-    public FileSystem getDefaultFS() {
-      return defaultFS;
-    }
-
-    public LocalDirAllocator getLocalDirAllocator() {
-      return lDirAllocator;
-    }
-
-    public TajoQueryEngine getTQueryEngine() {
-      return queryEngine;
-    }
-
-    public Map<QueryUnitAttemptId, Task> getTasks() {
-      return tasks;
-    }
-
-    public Task getTask(QueryUnitAttemptId taskId) {
-      return tasks.get(taskId);
-    }
-
-    public ExecutorService getFetchLauncher() {
-      return fetchLauncher;
-    }
-
-    public Path getBaseDir() {
-      return baseDirPath;
-    }
-  }
-
-  static void fatalError(MasterWorkerProtocolService.Interface proxy,
-                         QueryUnitAttemptId taskAttemptId, String message) {
-    TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder()
-        .setId(taskAttemptId.getProto())
-        .setErrorMessage(message);
-    proxy.fatalError(null, builder.build(), NullCallback.get());
-  }
-
-  public void run() {
-    LOG.info("TaskRunner startup");
-
-    try {
-
-      taskLauncher = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          int receivedNum = 0;
-          CallFuture2<QueryUnitRequestProto> callFuture = null;
-          QueryUnitRequestProto taskRequest = null;
-
-          while(!stopped) {
-            try {
-              if (callFuture == null) {
-                callFuture = new CallFuture2<QueryUnitRequestProto>();
-                master.getTask(null, ((ContainerIdPBImpl) containerId).getProto(),
-                    callFuture);
-              }
-              try {
-                // wait for an assigning task for 3 seconds
-                taskRequest = callFuture.get(3, TimeUnit.SECONDS);
-              } catch (TimeoutException te) {
-                // if there has been no assigning task for a given period,
-                // TaskRunner will retry to request an assigning task.
-                LOG.error(te);
-
-                continue;
-              }
-
-              if (taskRequest != null) {
-                // QueryMaster can send the terminal signal to TaskRunner.
-                // If TaskRunner receives the terminal signal, TaskRunner will be terminated
-                // immediately.
-                if (taskRequest.getShouldDie()) {
-                  LOG.info("received ShouldDie flag");
-                  stop();
-
-                } else {
-
-                  LOG.info("Accumulated Received Task: " + (++receivedNum));
-
-                  QueryUnitAttemptId taskAttemptId = new QueryUnitAttemptId(taskRequest.getId());
-                  if (tasks.containsKey(taskAttemptId)) {
-                    fatalError(master, taskAttemptId, "Duplicate Task Attempt: " + taskAttemptId);
-                    continue;
-                  }
-
-                  LOG.info("Initializing: " + taskAttemptId);
-                  Task task = new Task(taskAttemptId, workerContext, master,
-                      new QueryUnitRequestImpl(taskRequest));
-                  tasks.put(taskAttemptId, task);
-
-                  task.init();
-                  if (task.hasFetchPhase()) {
-                    task.fetch(); // The fetch is performed in an asynchronous way.
-                  }
-                  // task.run() is a blocking call.
-                  task.run();
-
-                  callFuture = null;
-                  taskRequest = null;
-                }
-              }
-            } catch (Throwable t) {
-              LOG.error(t);
-            }
-          }
-        }
-      });
-      taskLauncher.start();
-      taskLauncher.join();
-
-    } catch (Throwable t) {
-      LOG.fatal("Unhandled exception. Starting shutdown.", t);
-    } finally {
-      for (Task t : tasks.values()) {
-        if (t.getStatus() != TaskAttemptState.TA_SUCCEEDED) {
-          t.abort();
-        }
-      }
-    }
-  }
-
-  private class ShutdownHook implements Runnable {
-    @Override
-    public void run() {
-      LOG.info("received SIGINT Signal");
-      stop();
-    }
-  }
-
-  /**
-   * @return true if a stop has been requested.
-   */
-  public boolean isStopped() {
-    return this.stopped;
-  }
-
-  /**
-   * TaskRunner takes 5 arguments as follows:
-   * <ol>
-   * <li>1st: TaskRunnerListener hostname</li>
-   * <li>2nd: TaskRunnerListener port</li>
-   * <li>3nd: SubQueryId</li>
-   * <li>4th: NodeId</li>
-   * <li>5th: ContainerId</li>
-   * </ol>
-   */
-  public static void main(String[] args) throws Exception {
-    // Restore QueryConf
-    final QueryConf conf = new QueryConf();
-    conf.addResource(new Path(QueryConf.FILENAME));
-
-    LOG.info("MiniTajoYarn NM Local Dir: " + conf.get(ConfVars.TASK_LOCAL_DIR.varname));
-    LOG.info("OUTPUT DIR: " + conf.getOutputPath());
-    LOG.info("Tajo Root Dir: " + conf.get("tajo.rootdir"));
-
-    UserGroupInformation.setConfiguration(conf);
-
-    // TaskRunnerListener's address
-    String host = args[0];
-    int port = Integer.parseInt(args[1]);
-    final InetSocketAddress masterAddr =
-        NetUtils.createSocketAddrForHost(host, port);
-
-    // SubQueryId from String
-    final SubQueryId subQueryId = TajoIdUtils.newSubQueryId(args[2]);
-    // NodeId has a form of hostname:port.
-    NodeId nodeId = ConverterUtils.toNodeId(args[3]);
-    ContainerId containerId = ConverterUtils.toContainerId(args[4]);
-
-    // TODO - 'load credential' should be implemented
-    // Getting taskOwner
-    UserGroupInformation taskOwner =
-        UserGroupInformation.createRemoteUser(conf.getVar(ConfVars.QUERY_USERNAME));
-    //taskOwner.addToken(token);
-
-    // TaskRunnerListener RPC
-    ProtoAsyncRpcClient client;
-    MasterWorkerProtocolService.Interface master;
-
-    // initialize MasterWorkerProtocol as an actual task owner.
-    client =
-        taskOwner.doAs(new PrivilegedExceptionAction<ProtoAsyncRpcClient>() {
-          @Override
-          public ProtoAsyncRpcClient run() throws Exception {
-            return new ProtoAsyncRpcClient(MasterWorkerProtocol.class, masterAddr);
-          }
-        });
-    master = client.getStub();
-
-
-    TaskRunner taskRunner = new TaskRunner(subQueryId, nodeId, taskOwner, master, containerId);
-    taskRunner.init(conf);
-    taskRunner.start();
-    client.close();
-    LOG.info("TaskRunner (" + nodeId + ") main thread exiting");
-    System.exit(0);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java
deleted file mode 100644
index 5f0c772..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/FileAccessForbiddenException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import java.io.IOException;
-
-public class FileAccessForbiddenException extends IOException {
-  private static final long serialVersionUID = -3383272565826389213L;
-
-  public FileAccessForbiddenException() {
-  }
-
-  public FileAccessForbiddenException(String message) {
-    super(message);
-  }
-
-  public FileAccessForbiddenException(Throwable cause) {
-    super(cause);
-  }
-
-  public FileAccessForbiddenException(String message, Throwable cause) {
-    super(message, cause);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java
deleted file mode 100644
index a35922e..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServer.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.net.NetUtils;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.ChannelGroupFuture;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import tajo.worker.dataserver.retriever.DataRetriever;
-
-import java.net.InetSocketAddress;
-import java.util.concurrent.Executors;
-
-public class HttpDataServer {
-  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
-
-  private final InetSocketAddress addr;
-  private InetSocketAddress bindAddr;
-  private ServerBootstrap bootstrap = null;
-  private ChannelFactory factory = null;
-  private ChannelGroup channelGroup = null;
-
-  public HttpDataServer(final InetSocketAddress addr, 
-      final DataRetriever retriever) {
-    this.addr = addr;
-    this.factory = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
-        Runtime.getRuntime().availableProcessors() * 2);
-
-    // Configure the server.
-    this.bootstrap = new ServerBootstrap(factory);
-    // Set up the event pipeline factory.
-    this.bootstrap.setPipelineFactory(
-        new HttpDataServerPipelineFactory(retriever));    
-    this.channelGroup = new DefaultChannelGroup();
-  }
-
-  public HttpDataServer(String bindaddr, DataRetriever retriever) {
-    this(NetUtils.createSocketAddr(bindaddr), retriever);
-  }
-
-  public void start() {
-    // Bind and start to accept incoming connections.
-    Channel channel = bootstrap.bind(addr);
-    channelGroup.add(channel);    
-    this.bindAddr = (InetSocketAddress) channel.getLocalAddress();
-    LOG.info("HttpDataServer starts up ("
-        + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort()
-        + ")");
-  }
-  
-  public InetSocketAddress getBindAddress() {
-    return this.bindAddr;
-  }
-
-  public void stop() {
-    ChannelGroupFuture future = channelGroup.close();
-    future.awaitUninterruptibly();
-    factory.releaseExternalResources();
-
-    LOG.info("HttpDataServer shutdown ("
-        + this.bindAddr.getAddress().getHostAddress() + ":"
-        + this.bindAddr.getPort() + ")");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java
deleted file mode 100644
index 31ab6e1..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerHandler.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.*;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedFile;
-import org.jboss.netty.util.CharsetUtil;
-import tajo.worker.dataserver.retriever.DataRetriever;
-import tajo.worker.dataserver.retriever.FileChunk;
-
-import java.io.*;
-import java.net.URLDecoder;
-
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class HttpDataServerHandler extends SimpleChannelUpstreamHandler {
-  private final static Log LOG = LogFactory.getLog(HttpDataServer.class);
-  private final DataRetriever retriever;
-
-  public HttpDataServerHandler(DataRetriever retriever) {
-    this.retriever = retriever;
-  }
-
-  @Override
-  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
-      throws Exception {
-    HttpRequest request = (HttpRequest) e.getMessage();
-    if (request.getMethod() != GET) {
-      sendError(ctx, METHOD_NOT_ALLOWED);
-      return;
-    }
-
-    FileChunk [] file;
-    try {
-      file = retriever.handle(ctx, request);
-    } catch (FileNotFoundException fnf) {
-      LOG.error(fnf);
-      sendError(ctx, NOT_FOUND);
-      return;
-    } catch (IllegalArgumentException iae) {
-      LOG.error(iae);
-      sendError(ctx, BAD_REQUEST);
-      return;
-    } catch (FileAccessForbiddenException fafe) {
-      LOG.error(fafe);
-      sendError(ctx, FORBIDDEN);
-      return;
-    } catch (IOException ioe) {
-      LOG.error(ioe);
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-      return;
-    }
-
-    // Write the content.
-    Channel ch = e.getChannel();
-    if (file == null) {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT);
-      ch.write(response);
-      if (!isKeepAlive(request)) {
-        ch.close();
-      }
-    }  else {
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-      long totalSize = 0;
-      for (FileChunk chunk : file) {
-        totalSize += chunk.length();
-      }
-      setContentLength(response, totalSize);
-
-      // Write the initial line and the header.
-      ch.write(response);
-
-      ChannelFuture writeFuture = null;
-
-      for (FileChunk chunk : file) {
-        writeFuture = sendFile(ctx, ch, chunk);
-        if (writeFuture == null) {
-          sendError(ctx, NOT_FOUND);
-          return;
-        }
-      }
-
-      // Decide whether to close the connection or not.
-      if (!isKeepAlive(request)) {
-        // Close the connection when the whole content is written out.
-        writeFuture.addListener(ChannelFutureListener.CLOSE);
-      }
-    }
-  }
-
-  private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException {
-    RandomAccessFile raf;
-    try {
-      raf = new RandomAccessFile(file.getFile(), "r");
-    } catch (FileNotFoundException fnfe) {
-      return null;
-    }
-
-    ChannelFuture writeFuture;
-    if (ch.getPipeline().get(SslHandler.class) != null) {
-      // Cannot use zero-copy with HTTPS.
-      writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), file.length(), 8192));
-    } else {
-      // No encryption - use zero-copy.
-      final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length());
-      writeFuture = ch.write(region);
-      writeFuture.addListener(new ChannelFutureListener() {
-        public void operationComplete(ChannelFuture future) {
-          region.releaseExternalResources();
-        }
-      });
-    }
-
-    return writeFuture;
-  }
-
-  @Override
-  public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
-      throws Exception {
-    Channel ch = e.getChannel();
-    Throwable cause = e.getCause();
-    if (cause instanceof TooLongFrameException) {
-      sendError(ctx, BAD_REQUEST);
-      return;
-    }
-
-    cause.printStackTrace();
-    if (ch.isConnected()) {
-      sendError(ctx, INTERNAL_SERVER_ERROR);
-    }
-  }
-
-  public static String sanitizeUri(String uri) {
-    // Decode the path.
-    try {
-      uri = URLDecoder.decode(uri, "UTF-8");
-    } catch (UnsupportedEncodingException e) {
-      try {
-        uri = URLDecoder.decode(uri, "ISO-8859-1");
-      } catch (UnsupportedEncodingException e1) {
-        throw new Error();
-      }
-    }
-
-    // Convert file separators.
-    uri = uri.replace('/', File.separatorChar);
-
-    // Simplistic dumb security check.
-    // You will have to do something serious in the production environment.
-    if (uri.contains(File.separator + ".")
-        || uri.contains("." + File.separator) || uri.startsWith(".")
-        || uri.endsWith(".")) {
-      return null;
-    }
-
-    // Convert to absolute path.
-    return uri;
-  }
-
-  private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
-    response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
-    response.setContent(ChannelBuffers.copiedBuffer(
-        "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8));
-
-    // Close the connection as soon as the error message is sent.
-    ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
deleted file mode 100644
index e4fd554..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpDataServerPipelineFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import tajo.worker.dataserver.retriever.DataRetriever;
-
-import static org.jboss.netty.channel.Channels.pipeline;
-
-public class HttpDataServerPipelineFactory implements ChannelPipelineFactory {
-  private final DataRetriever ret;
-
-  public HttpDataServerPipelineFactory(DataRetriever ret) {
-    this.ret = ret;
-  }
-
-  public ChannelPipeline getPipeline() throws Exception {
-    // Create a default pipeline implementation.
-    ChannelPipeline pipeline = pipeline();
-
-    // Uncomment the following line if you want HTTPS
-    // SSLEngine engine =
-    // SecureChatSslContextFactory.getServerContext().createSSLEngine();
-    // engine.setUseClientMode(false);
-    // pipeline.addLast("ssl", new SslHandler(engine));
-
-    pipeline.addLast("decoder", new HttpRequestDecoder());
-    //pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
-    pipeline.addLast("encoder", new HttpResponseEncoder());
-    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
-    //pipeline.addLast("deflater", new HttpContentCompressor());
-    pipeline.addLast("handler", new HttpDataServerHandler(ret));
-    return pipeline;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java
deleted file mode 100644
index 4c18a0f..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/HttpUtil.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver;
-
-import com.google.common.collect.Maps;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.util.Map;
-
-public class HttpUtil {
-  public static Map<String,String> getParams(URI uri) throws UnsupportedEncodingException {
-    return getParamsFromQuery(uri.getQuery());
-  }
-
-  /**
-   * It parses a query string into key/value pairs
-   *
-   * @param queryString decoded query string
-   * @return key/value pairs parsed from a given query string
-   * @throws UnsupportedEncodingException
-   */
-  public static Map<String, String> getParamsFromQuery(String queryString) throws UnsupportedEncodingException {
-    String [] queries = queryString.split("&");
-
-    Map<String,String> params = Maps.newHashMap();
-    String [] param;
-    for (String q : queries) {
-      param = q.split("=");
-      params.put(param[0], param[1]);
-    }
-
-    return params;
-  }
-
-  public static String buildQuery(Map<String,String> params) throws UnsupportedEncodingException {
-    StringBuilder sb = new StringBuilder();
-
-    boolean first = true;
-    for (Map.Entry<String,String> param : params.entrySet()) {
-      if (!first) {
-        sb.append("&");
-      }
-      sb.append(URLEncoder.encode(param.getKey(), "UTF-8")).
-          append("=").
-          append(URLEncoder.encode(param.getValue(), "UTF-8"));
-      first = false;
-    }
-
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
deleted file mode 100644
index fef717b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/AdvancedDataRetriever.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import tajo.QueryUnitAttemptId;
-import tajo.QueryUnitId;
-import tajo.SubQueryId;
-import tajo.util.TajoIdUtils;
-import tajo.worker.dataserver.FileAccessForbiddenException;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class AdvancedDataRetriever implements DataRetriever {
-  private final Log LOG = LogFactory.getLog(AdvancedDataRetriever.class);
-  private final Map<String, RetrieverHandler> handlerMap = Maps.newConcurrentMap();
-
-  public AdvancedDataRetriever() {
-  }
-  
-  public void register(QueryUnitAttemptId id, RetrieverHandler handler) {
-    synchronized (handlerMap) {
-      if (!handlerMap.containsKey(id.toString())) {
-        handlerMap.put(id.toString(), handler);
-      }
-    } 
-  }
-  
-  public void unregister(QueryUnitAttemptId id) {
-    synchronized (handlerMap) {
-      if (handlerMap.containsKey(id.toString())) {
-        handlerMap.remove(id.toString());
-      }
-    }
-  }
-
-  /* (non-Javadoc)
-   * @see tajo.worker.dataserver.retriever.DataRetriever#handle(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.handler.codec.http.HttpRequest)
-   */
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-
-    final Map<String, List<String>> params =
-      new QueryStringDecoder(request.getUri()).getParameters();
-
-    if (!params.containsKey("qid")) {
-      throw new FileNotFoundException("No such qid: " + params.containsKey("qid"));
-    }
-
-    if (params.containsKey("sid")) {
-      List<FileChunk> chunks = Lists.newArrayList();
-      List<String> qids = splitMaps(params.get("qid"));
-      for (String qid : qids) {
-        String[] ids = qid.split("_");
-        SubQueryId suid = TajoIdUtils.newSubQueryId(params.get("sid").get(0));
-        QueryUnitId quid = new QueryUnitId(suid, Integer.parseInt(ids[0]));
-        QueryUnitAttemptId attemptId = new QueryUnitAttemptId(quid,
-            Integer.parseInt(ids[1]));
-        RetrieverHandler handler = handlerMap.get(attemptId.toString());
-        FileChunk chunk = handler.get(params);
-        chunks.add(chunk);
-      }
-      return chunks.toArray(new FileChunk[chunks.size()]);
-    } else {
-      RetrieverHandler handler = handlerMap.get(params.get("qid").get(0));
-      FileChunk chunk = handler.get(params);
-      if (chunk == null) {
-        if (params.containsKey("qid")) { // if there is no content corresponding to the query
-          return null;
-        } else { // if there is no
-          throw new FileNotFoundException("No such a file corresponding to " + params.get("qid"));
-        }
-      }
-
-      File file = chunk.getFile();
-      if (file.isHidden() || !file.exists()) {
-        throw new FileNotFoundException("No such file: " + file.getAbsolutePath());
-      }
-      if (!file.isFile()) {
-        throw new FileAccessForbiddenException(file.getAbsolutePath() + " is not file");
-      }
-
-      return new FileChunk[] {chunk};
-    }
-  }
-
-  private List<String> splitMaps(List<String> qids) {
-    if (null == qids) {
-      LOG.error("QueryUnitId is EMPTY");
-      return null;
-    }
-
-    final List<String> ret = new ArrayList<String>();
-    for (String qid : qids) {
-      Collections.addAll(ret, qid.split(","));
-    }
-    return ret;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java
deleted file mode 100644
index 83d0540..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DataRetriever.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-
-import java.io.IOException;
-
-public interface DataRetriever {
-  FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java
deleted file mode 100644
index bd44878..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/DirectoryRetriever.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import tajo.worker.dataserver.FileAccessForbiddenException;
-import tajo.worker.dataserver.HttpDataServerHandler;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-public class DirectoryRetriever implements DataRetriever {
-  public String baseDir;
-  
-  public DirectoryRetriever(String baseDir) {
-    this.baseDir = baseDir;
-  }
-
-  @Override
-  public FileChunk [] handle(ChannelHandlerContext ctx, HttpRequest request)
-      throws IOException {
-    final String path = HttpDataServerHandler.sanitizeUri(request.getUri());
-    if (path == null) {
-      throw new IllegalArgumentException("Wrong path: " +path);
-    }
-
-    File file = new File(baseDir, path);
-    if (file.isHidden() || !file.exists()) {
-      throw new FileNotFoundException("No such file: " + baseDir + "/" + path);
-    }
-    if (!file.isFile()) {
-      throw new FileAccessForbiddenException("No such file: " 
-          + baseDir + "/" + path); 
-    }
-    
-    return new FileChunk[] {new FileChunk(file, 0, file.length())};
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java
deleted file mode 100644
index 9af274b..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/FileChunk.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-
-public class FileChunk {
-  private final File file;
-  private final long startOffset;
-  private final long length;
-
-  public FileChunk(File file, long startOffset, long length) throws FileNotFoundException {
-    this.file = file;
-    this.startOffset = startOffset;
-    this.length = length;
-  }
-
-  public File getFile() {
-    return this.file;
-  }
-
-  public long startOffset() {
-    return this.startOffset;
-  }
-
-  public long length() {
-    return this.length;
-  }
-
-  public String toString() {
-    return " (start=" + startOffset() + ", length=" + length + ") "
-        + file.getAbsolutePath();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java b/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java
deleted file mode 100644
index ae2d5c4..0000000
--- a/tajo-core/tajo-core-backend/src/main/java/tajo/worker/dataserver/retriever/RetrieverHandler.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package tajo.worker.dataserver.retriever;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public interface RetrieverHandler {
-  /**
-   *
-   * @param kvs url-decoded key/value pairs
-   * @return a desired part of a file
-   * @throws IOException
-   */
-  public FileChunk get(Map<String, List<String>> kvs) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
index d273059..a7866ac 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtocol.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.catalog";
+option java_package = "org.apache.tajo.catalog";
 option java_outer_classname = "CatalogProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
index 36b33c1..ee7e277 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/CatalogProtos.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.catalog.proto";
+option java_package = "org.apache.tajo.catalog.proto";
 option java_outer_classname = "CatalogProtos";
 option optimize_for = SPEED;
 option java_generic_services = false;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
index e78c87f..cbcccd3 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/ClientProtocol.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.client";
+option java_package = "org.apache.tajo.client";
 option java_outer_classname = "ClientProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto b/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
index 262ecac..3dbf6fa 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/DataTypes.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.common";
+option java_package = "org.apache.tajo.common";
 option java_outer_classname = "TajoDataTypes";
 option optimize_for = SPEED;
 option java_generic_services = false;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
index c2f2aab..1ddf24a 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtocol.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.ipc";
+option java_package = "org.apache.tajo.ipc";
 option java_outer_classname = "MasterWorkerProtocol";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
index d8863e4..9fea5be 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/MasterWorkerProtos.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.engine";
+option java_package = "org.apache.tajo.engine";
 option java_outer_classname = "MasterWorkerProtos";
 option java_generic_services = false;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
index 5fa6903..e722190 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/PrimitiveProtos.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo.rpc.protocolrecords";
+option java_package = "org.apache.tajo.rpc.protocolrecords";
 option java_outer_classname = "PrimitiveProtos";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
index abd47c0..04c67f2 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/TajoIdProtos.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo";
+option java_package = "org.apache.tajo";
 option java_outer_classname = "TajoIdProtos";
 option java_generic_services = false;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto b/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
index d4d2fee..3d22ff5 100644
--- a/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
+++ b/tajo-core/tajo-core-backend/src/main/proto/tajo_protos.proto
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-option java_package = "tajo";
+option java_package = "org.apache.tajo";
 option java_outer_classname = "TajoProtos";
 option java_generic_services = false;
 option java_generate_equals_and_hash = true;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
new file mode 100644
index 0000000..ea711b8
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.tajo;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.engine.parser.QueryAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.logical.LogicalNode;
+import org.apache.tajo.engine.planner.physical.PhysicalExec;
+import org.apache.tajo.engine.query.ResultSetImpl;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.util.FileUtil;
+import org.apache.tajo.util.TUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.UUID;
+
+public class BackendTestingUtil {
+	public final static Schema mockupSchema;
+	public final static TableMeta mockupMeta;
+
+	static {
+    mockupSchema = new Schema();
+    mockupSchema.addColumn("deptname", Type.TEXT);
+    mockupSchema.addColumn("score", Type.INT4);
+    mockupMeta = CatalogUtil.newTableMeta(mockupSchema, StoreType.CSV);
+	}
+
+  public static void writeTmpTable(TajoConf conf, Path path,
+                                   String tableName, boolean writeMeta)
+      throws IOException {
+    StorageManager sm = StorageManager.get(conf, path);
+    FileSystem fs = sm.getFileSystem();
+    Appender appender;
+
+    Path tablePath = StorageUtil.concatPath(path, tableName, "table.csv");
+    if (fs.exists(tablePath.getParent())) {
+      fs.delete(tablePath.getParent(), true);
+    }
+    fs.mkdirs(tablePath.getParent());
+
+    if (writeMeta) {
+      FileUtil.writeProto(fs, new Path(tablePath.getParent(), ".meta"), mockupMeta.getProto());
+    }
+    appender = StorageManager.getAppender(conf, mockupMeta, tablePath);
+    appender.init();
+
+    int deptSize = 10000;
+    int tupleNum = 100;
+    Tuple tuple;
+    for (int i = 0; i < tupleNum; i++) {
+      tuple = new VTuple(2);
+      String key = "test" + (i % deptSize);
+      tuple.put(0, DatumFactory.createText(key));
+      tuple.put(1, DatumFactory.createInt4(i + 1));
+      appender.addTuple(tuple);
+    }
+    appender.close();
+  }
+
+	public static void writeTmpTable(TajoConf conf, String parent,
+	    String tableName, boolean writeMeta) throws IOException {
+    writeTmpTable(conf, new Path(parent), tableName, writeMeta);
+	}
+
+  private TajoConf conf;
+  private CatalogService catalog;
+  private QueryAnalyzer analyzer;
+  private LogicalPlanner planner;
+  public BackendTestingUtil(TajoConf conf) throws IOException {
+    this.conf = conf;
+    this.catalog = new LocalCatalog(conf);
+    analyzer = new QueryAnalyzer(catalog);
+    planner = new LogicalPlanner(catalog);
+  }
+
+  public ResultSet run(String [] tableNames, File [] tables, Schema [] schemas, String query)
+      throws IOException {
+    Path workDir = createTmpTestDir();
+    StorageManager sm = StorageManager.get(new TajoConf(), workDir);
+    List<Fragment> frags = Lists.newArrayList();
+    for (int i = 0; i < tableNames.length; i++) {
+      Fragment [] splits = sm.split(tableNames[i], new Path(tables[i].getAbsolutePath()));
+      for (Fragment f : splits) {
+        frags.add(f);
+      }
+    }
+
+    TaskAttemptContext ctx = new TaskAttemptContext(conf,
+        TUtil.newQueryUnitAttemptId(),
+        frags.toArray(new Fragment[frags.size()]), workDir);
+    PlanningContext context = analyzer.parse(query);
+    LogicalNode plan = planner.createPlan(context);
+    plan = LogicalOptimizer.optimize(context, plan);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
+
+    return new ResultSetImpl(conf, new Path(workDir, "out"));
+  }
+
+  public static Path createTmpTestDir() throws IOException {
+    String randomStr = UUID.randomUUID().toString();
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Path dir = new Path("target/test-data", randomStr);
+    // Have it cleaned up on exit
+    if (fs.exists(dir)) {
+      fs.delete(dir, true);
+    }
+    return dir;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java
new file mode 100644
index 0000000..755b90a
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/IntegrationTest.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+public interface IntegrationTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
new file mode 100644
index 0000000..6425192
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.conf.TajoConf;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+public class LocalTajoTestingUtility {
+  private TajoTestingCluster util;
+  private TajoConf conf;
+  private TajoClient client;
+
+  public void setup(String[] names,
+                    String[] tablepaths,
+                    Schema[] schemas,
+                    Options option) throws Exception {
+
+    util = new TajoTestingCluster();
+    util.startMiniCluster(1);
+    conf = util.getConfiguration();
+    client = new TajoClient(conf);
+
+    FileSystem fs = util.getDefaultFileSystem();
+    Path rootDir = util.getMaster().
+        getStorageManager().getBaseDir();
+    fs.mkdirs(rootDir);
+    for (int i = 0; i < tablepaths.length; i++) {
+      Path localPath = new Path(tablepaths[i]);
+      Path tablePath = new Path(rootDir, names[i]);
+      fs.mkdirs(tablePath);
+      Path dataPath = new Path(tablePath, "data");
+      fs.mkdirs(dataPath);
+      Path dfsPath = new Path(dataPath, localPath.getName());
+      fs.copyFromLocalFile(localPath, dfsPath);
+      TableMeta meta = CatalogUtil.newTableMeta(schemas[i],
+          CatalogProtos.StoreType.CSV, option);
+      client.createTable(names[i], tablePath, meta);
+    }
+  }
+
+  public TajoTestingCluster getTestingCluster() {
+    return util;
+  }
+
+  public ResultSet execute(String query) throws IOException, ServiceException {
+    return client.executeQueryAndGetResult(query);
+  }
+
+  public void shutdown() throws IOException {
+    client.close();
+    util.shutdownMiniCluster();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc6359b8/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
new file mode 100644
index 0000000..8de0f0b
--- /dev/null
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/MiniTajoYarnCluster.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.LocalContainerLauncher;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.PullServerAuxService;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Configures and starts the Tajo-specific components in the YARN cluster.
+ *
+ */
+public class MiniTajoYarnCluster extends MiniYARNCluster {
+
+  public static final String APPJAR = JarFinder
+      .getJar(LocalContainerLauncher.class);
+
+  private static final Log LOG = LogFactory.getLog(MiniTajoYarnCluster.class);
+
+  public MiniTajoYarnCluster(String testName) {
+    this(testName, 1);
+  }
+
+  public MiniTajoYarnCluster(String testName, int noOfNMs) {
+    super(testName, noOfNMs, 1, 1);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    if (conf.get(MRJobConfig.MR_AM_STAGING_DIR) == null) {
+      conf.set(MRJobConfig.MR_AM_STAGING_DIR, new File(getTestWorkDir(),
+          "apps_staging_dir/").getAbsolutePath());
+    }
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,  "000");
+
+    try {
+      Path stagingPath = FileContext.getFileContext(conf).makeQualified(
+          new Path(conf.get(MRJobConfig.MR_AM_STAGING_DIR)));
+      FileContext fc=FileContext.getFileContext(stagingPath.toUri(), conf);
+      if (fc.util().exists(stagingPath)) {
+        LOG.info(stagingPath + " exists! deleting...");
+        fc.delete(stagingPath, true);
+      }
+      LOG.info("mkdir: " + stagingPath);
+      //mkdir the staging directory so that right permissions are set while running as proxy user
+      fc.mkdir(stagingPath, null, true);
+      //mkdir done directory as well
+      String doneDir = JobHistoryUtils
+          .getConfiguredHistoryServerDoneDirPrefix(conf);
+      Path doneDirPath = fc.makeQualified(new Path(doneDir));
+      fc.mkdir(doneDirPath, null, true);
+    } catch (IOException e) {
+      throw new YarnException("Could not create staging directory. ", e);
+    }
+    conf.set(MRConfig.MASTER_ADDRESS, "test"); // The default is local because of
+    // which shuffle doesn't happen
+    //configure the shuffle service in NM
+    conf.setStrings(YarnConfiguration.NM_AUX_SERVICES, PullServerAuxService.PULLSERVER_SERVICEID);
+    conf.setClass(String.format(YarnConfiguration.NM_AUX_SERVICE_FMT,
+        PullServerAuxService.PULLSERVER_SERVICEID), PullServerAuxService.class,
+        Service.class);
+
+    // Non-standard shuffle port
+    conf.setInt(TajoConf.ConfVars.PULLSERVER_PORT.name(), 0);
+
+    conf.setClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
+        DefaultContainerExecutor.class, ContainerExecutor.class);
+
+    // TestMRJobs is for testing non-uberized operation only; see TestUberAM
+    // for corresponding uberized tests.
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+
+    LOG.info("MiniTajoYarn NM Local Dir: " + getConfig().get(YarnConfiguration.NM_LOCAL_DIRS));
+  }
+
+  private class JobHistoryServerWrapper extends AbstractService {
+    public JobHistoryServerWrapper() {
+      super(JobHistoryServerWrapper.class.getName());
+    }
+
+    @Override
+    public synchronized void start() {
+      try {
+        if (!getConfig().getBoolean(
+            JHAdminConfig.MR_HISTORY_MINICLUSTER_FIXED_PORTS,
+            JHAdminConfig.DEFAULT_MR_HISTORY_MINICLUSTER_FIXED_PORTS)) {
+          // pick free random ports.
+          getConfig().set(JHAdminConfig.MR_HISTORY_ADDRESS,
+              MiniYARNCluster.getHostname() + ":0");
+          getConfig().set(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS,
+              MiniYARNCluster.getHostname() + ":0");
+        }
+        super.start();
+      } catch (Throwable t) {
+        throw new YarnException(t);
+      }
+
+      LOG.info("MiniMRYARN ResourceManager address: " +
+          getConfig().get(YarnConfiguration.RM_ADDRESS));
+      LOG.info("MiniMRYARN ResourceManager web address: " +
+          getConfig().get(YarnConfiguration.RM_WEBAPP_ADDRESS));
+      LOG.info("MiniMRYARN HistoryServer address: " +
+          getConfig().get(JHAdminConfig.MR_HISTORY_ADDRESS));
+      LOG.info("MiniMRYARN HistoryServer web address: " +
+          getConfig().get(JHAdminConfig.MR_HISTORY_WEBAPP_ADDRESS));
+    }
+
+    @Override
+    public synchronized void stop() {
+      super.stop();
+    }
+  }
+
+  public static void main(String [] args) {
+    MiniTajoYarnCluster cluster = new MiniTajoYarnCluster(MiniTajoYarnCluster.class.getName());
+    cluster.init(new TajoConf());
+    cluster.start();
+  }
+}
+