You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/02 04:56:37 UTC

[04/14] tajo git commit: TAJO-1166: S3 related storage causes compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 0e3ccad..0cc87fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -48,7 +49,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import static org.apache.tajo.ipc.TajoMasterProtocol.*;
 
 
@@ -80,7 +80,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
   private TajoConf systemConf;
 
-  private ConcurrentMap<ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps.newConcurrentMap();
+  private ConcurrentMap<ContainerProtocol.TajoContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps
+    .newConcurrentMap();
 
   /** It receives status messages from workers and their resources. */
   private TajoResourceTracker resourceTracker;
@@ -194,7 +195,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
   private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) {
     float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar(
-        TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+      TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
     int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
 
     WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder();
@@ -235,7 +236,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     return resource;
   }
 
-  private void registerQueryMaster(QueryId queryId, ContainerIdProto containerId) {
+  private void registerQueryMaster(QueryId queryId, ContainerProtocol.TajoContainerIdProto containerId) {
     rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId);
   }
 
@@ -256,9 +257,9 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     WorkerResourceAllocationRequest request;
     RpcCallback<WorkerResourceAllocationResponse> callBack;
     WorkerResourceRequest(
-        QueryId queryId,
-        boolean queryMasterRequest, WorkerResourceAllocationRequest request,
-        RpcCallback<WorkerResourceAllocationResponse> callBack) {
+      QueryId queryId,
+      boolean queryMasterRequest, WorkerResourceAllocationRequest request,
+      RpcCallback<WorkerResourceAllocationResponse> callBack) {
       this.queryId = queryId;
       this.queryMasterRequest = queryMasterRequest;
       this.request = request;
@@ -282,14 +283,14 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("allocateWorkerResources:" +
-                (new QueryId(resourceRequest.request.getQueryId())) +
-                ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
-                "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
-                ", requiredContainers:" + resourceRequest.request.getNumContainers() +
-                ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
-                "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
-                ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
-                ", liveWorkers=" + rmContext.getWorkers().size());
+              (new QueryId(resourceRequest.request.getQueryId())) +
+              ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+              "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+              ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+              ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+              "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
+              ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+              ", liveWorkers=" + rmContext.getWorkers().size());
           }
 
           // TajoWorkerResourceManager can't return allocated disk slots occasionally.
@@ -300,25 +301,25 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
 
             if(allocatedWorkerResources.size() > 0) {
               List<WorkerAllocatedResource> allocatedResources =
-                  new ArrayList<WorkerAllocatedResource>();
+                new ArrayList<WorkerAllocatedResource>();
 
               for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
                 NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
-                    allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
+                  allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
 
                 TajoWorkerContainerId containerId = new TajoWorkerContainerId();
 
                 containerId.setApplicationAttemptId(
-                    ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+                  ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
                 containerId.setId(containerIdSeq.incrementAndGet());
 
-                ContainerIdProto containerIdProto = containerId.getProto();
+                ContainerProtocol.TajoContainerIdProto containerIdProto = containerId.getProto();
                 allocatedResources.add(WorkerAllocatedResource.newBuilder()
-                    .setContainerId(containerIdProto)
-                    .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
-                    .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
-                    .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
-                    .build());
+                  .setContainerId(containerIdProto)
+                  .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
+                  .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+                  .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+                  .build());
 
 
                 allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
@@ -358,7 +359,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
     int allocatedResources = 0;
 
     TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
-        = resourceRequest.request.getResourceRequestPriority();
+      = resourceRequest.request.getResourceRequestPriority();
 
     if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
       synchronized(rmContext) {
@@ -369,7 +370,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
         int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
         int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
         float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
-            resourceRequest.request.getMinDiskSlotPerContainer());
+          resourceRequest.request.getMinDiskSlotPerContainer());
 
         int liveWorkerSize = randomWorkers.size();
         Set<Integer> insufficientWorkers = new HashSet<Integer>();
@@ -418,7 +419,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
               }
 
               workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
-                  allocatedWorkerResource.allocatedMemoryMB);
+                allocatedWorkerResource.allocatedMemoryMB);
 
               selectedWorkers.add(allocatedWorkerResource);
 
@@ -438,7 +439,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
         float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
         float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
         int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
-            resourceRequest.request.getMinMemoryMBPerContainer());
+          resourceRequest.request.getMinMemoryMBPerContainer());
 
         int liveWorkerSize = randomWorkers.size();
         Set<Integer> insufficientWorkers = new HashSet<Integer>();
@@ -487,7 +488,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
                 allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
               }
               workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
-                  allocatedWorkerResource.allocatedMemoryMB);
+                allocatedWorkerResource.allocatedMemoryMB);
 
               selectedWorkers.add(allocatedWorkerResource);
 
@@ -508,7 +509,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
    * @param containerId ContainerIdProto to be released
    */
   @Override
-  public void releaseWorkerResource(ContainerIdProto containerId) {
+  public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId) {
     AllocatedWorkerResource allocated = allocatedResourceMap.get(containerId);
     if(allocated != null) {
       LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB);
@@ -530,7 +531,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
       LOG.warn("No QueryMaster resource info for " + queryId);
       return;
     } else {
-      ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
+      ContainerProtocol.TajoContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
       releaseWorkerResource(containerId);
       rmContext.getStoppedQueryIds().add(queryId);
       LOG.info(String.format("Released QueryMaster (%s) resource." , queryId.toString()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 8e8ac51..9c2b71b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -20,8 +20,8 @@ package org.apache.tajo.master.rm;
 
 import com.google.protobuf.RpcCallback;
 import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.master.querymaster.QueryInProgress;
 
@@ -63,7 +63,7 @@ public interface WorkerResourceManager extends Service {
    *
    * @param containerId ContainerIdProto to be released
    */
-  public void releaseWorkerResource(ContainerIdProto containerId);
+  public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId);
 
   public String getSeedQueryId() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index ca71c53..68c57f2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -20,9 +20,9 @@ package org.apache.tajo.worker;
 
 import com.google.common.collect.Maps;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tajo.master.ContainerProxy;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
 
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -41,29 +41,29 @@ public abstract class AbstractResourceAllocator extends CompositeService impleme
     workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
   }
 
-  private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
+  private Map<TajoContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
 
   public AbstractResourceAllocator() {
     super(AbstractResourceAllocator.class.getName());
   }
 
-  public void addContainer(ContainerId cId, ContainerProxy container) {
+  public void addContainer(TajoContainerId cId, ContainerProxy container) {
     containers.put(cId, container);
   }
 
-  public void removeContainer(ContainerId cId) {
+  public void removeContainer(TajoContainerId cId) {
     containers.remove(cId);
   }
 
-  public boolean containsContainer(ContainerId cId) {
+  public boolean containsContainer(TajoContainerId cId) {
     return containers.containsKey(cId);
   }
 
-  public ContainerProxy getContainer(ContainerId cId) {
+  public ContainerProxy getContainer(TajoContainerId cId) {
     return containers.get(cId);
   }
 
-  public Map<ContainerId, ContainerProxy> getContainers() {
+  public Map<TajoContainerId, ContainerProxy> getContainers() {
     return containers;
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index 8b9219c..b713e70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -18,12 +18,12 @@
 
 package org.apache.tajo.worker;
 
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
 
 public interface ResourceAllocator {
   public void allocateTaskWorker();
-  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+  public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerId);
   public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
                                            int numTasks, int memoryMBPerTask);
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 2220089..9345885 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -23,28 +23,25 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
 import org.apache.tajo.master.event.ContainerAllocationEvent;
 import org.apache.tajo.master.event.ContainerAllocatorEventType;
 import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
 import org.apache.tajo.master.querymaster.QueryMasterTask;
 import org.apache.tajo.master.querymaster.SubQuery;
 import org.apache.tajo.master.querymaster.SubQueryState;
-import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.*;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
@@ -72,11 +69,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
   public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
     this.queryTaskContext = queryTaskContext;
     executorService = Executors.newFixedThreadPool(
-        queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+      queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
   }
 
   @Override
-  public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) {
+  public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerIdProto) {
     TajoWorkerContainerId containerId = new TajoWorkerContainerId();
     ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
     containerId.setApplicationAttemptId(appAttemptId);
@@ -98,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
     clusterSlots =  Math.max(1, clusterSlots - 1); // reserve query master slot
     LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
-        ", Number of Cluster Slots=" + clusterSlots);
+      ", Number of Cluster Slots=" + clusterSlots);
     return  Math.min(numTasks, clusterSlots);
   }
 
@@ -121,7 +118,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
     executorService.shutdownNow();
 
-    Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
+    Map<TajoContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator()
+      .getContainers();
     List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
     for(ContainerProxy eachProxy: list) {
       try {
@@ -156,16 +154,17 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
   private void launchTaskRunners(LaunchTaskRunnersEvent event) {
     // Query in standby mode doesn't need launch Worker.
     // But, Assign ExecutionBlock to assigned tajo worker
-    for(Container eachContainer: event.getContainers()) {
+    for(TajoContainer eachContainer: event.getContainers()) {
       TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
-          eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
+        eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
       executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
     }
   }
 
-  public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection<Container> containers) {
+  public void stopExecutionBlock(final ExecutionBlockId executionBlockId,
+                                 Collection<TajoContainer> containers) {
     Set<NodeId> workers = Sets.newHashSet();
-    for (Container container : containers){
+    for (TajoContainer container : containers){
       workers.add(container.getNodeId());
     }
 
@@ -196,8 +195,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
   protected static class LaunchRunner implements Runnable {
     private final ContainerProxy proxy;
-    private final ContainerId id;
-    public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+    private final TajoContainerId id;
+    public LaunchRunner(TajoContainerId id, ContainerProxy proxy) {
       this.proxy = proxy;
       this.id = id;
     }
@@ -210,8 +209,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     }
   }
 
-  private void stopContainers(Collection<Container> containers) {
-    for (Container container : containers) {
+  private void stopContainers(Collection<TajoContainer> containers) {
+    for (TajoContainer container : containers) {
       final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
       executorService.submit(new StopContainerRunner(container.getId(), proxy));
     }
@@ -219,8 +218,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
 
   private static class StopContainerRunner implements Runnable {
     private final ContainerProxy proxy;
-    private final ContainerId id;
-    public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+    private final TajoContainerId id;
+    public StopContainerRunner(TajoContainerId id, ContainerProxy proxy) {
       this.id = id;
       this.proxy = proxy;
     }
@@ -251,23 +250,23 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
     public void run() {
       LOG.info("Start TajoWorkerAllocationThread");
       CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
-          new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+        new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
 
       //TODO consider task's resource usage pattern
       int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
       float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
 
       TajoMasterProtocol.WorkerResourceAllocationRequest request =
-          TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
-              .setMinMemoryMBPerContainer(requiredMemoryMB)
-              .setMaxMemoryMBPerContainer(requiredMemoryMB)
-              .setNumContainers(event.getRequiredNum())
-              .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
-                  : TajoMasterProtocol.ResourceRequestPriority.DISK)
-              .setMinDiskSlotPerContainer(requiredDiskSlots)
-              .setMaxDiskSlotPerContainer(requiredDiskSlots)
-              .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
-              .build();
+        TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+          .setMinMemoryMBPerContainer(requiredMemoryMB)
+          .setMaxMemoryMBPerContainer(requiredMemoryMB)
+          .setNumContainers(event.getRequiredNum())
+          .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
+            : TajoMasterProtocol.ResourceRequestPriority.DISK)
+          .setMinDiskSlotPerContainer(requiredDiskSlots)
+          .setMaxDiskSlotPerContainer(requiredDiskSlots)
+          .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
+          .build();
 
       RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
       NettyClientBase tmClient = null;
@@ -280,21 +279,21 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
           try {
             tmClient = connPool.getConnection(
-                queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+              queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
           } catch (Exception e) {
             queryTaskContext.getQueryMasterContext().getWorkerContext().
-                setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
+              setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
             queryTaskContext.getQueryMasterContext().getWorkerContext().
-                setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
+              setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
             tmClient = connPool.getConnection(
-                queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-                TajoMasterProtocol.class, true);
+              queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+              TajoMasterProtocol.class, true);
           }
         } else {
           tmClient = connPool.getConnection(
-              queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
-              TajoMasterProtocol.class, true);
+            queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+            TajoMasterProtocol.class, true);
         }
 
         TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
@@ -325,17 +324,17 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
         ExecutionBlockId executionBlockId = event.getExecutionBlockId();
 
-        List<Container> containers = new ArrayList<Container>();
+        List<TajoContainer> containers = new ArrayList<TajoContainer>();
         for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
           TajoWorkerContainer container = new TajoWorkerContainer();
           NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
-              eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
+            eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
 
           TajoWorkerContainerId containerId = new TajoWorkerContainerId();
 
           containerId.setApplicationAttemptId(
-              ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
-                  eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+            ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+              eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
           containerId.setId(eachAllocatedResource.getContainerId().getId());
 
           container.setId(containerId);
@@ -347,7 +346,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
           workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
 
           Worker worker = new Worker(null, workerResource,
-              new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
+            new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
           container.setWorkerResource(worker);
           addWorkerConnectionInfo(worker.getConnectionInfo());
           containers.add(container);
@@ -356,8 +355,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
         SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState();
         if (!SubQuery.isRunningState(state)) {
           try {
-            List<ContainerId> containerIds = new ArrayList<ContainerId>();
-            for(Container eachContainer: containers) {
+            List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>();
+            for(TajoContainer eachContainer: containers) {
               containerIds.add(eachContainer.getId());
             }
             TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
@@ -378,10 +377,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
       }
       if(event.getRequiredNum() > numAllocatedContainers) {
         ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
-            event.getType(), event.getExecutionBlockId(), event.getPriority(),
-            event.getResource(),
-            event.getRequiredNum() - numAllocatedContainers,
-            event.isLeafQuery(), event.getProgress()
+          event.getType(), event.getExecutionBlockId(), event.getPriority(),
+          event.getResource(),
+          event.getRequiredNum() - numAllocatedContainers,
+          event.isLeafQuery(), event.getProgress()
         );
         queryTaskContext.getEventHandler().handle(shortRequestEvent);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 1910575..4e9860b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -24,15 +24,15 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.query.QueryUnitRequestImpl;
 import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.container.impl.pb.TajoContainerIdPBImpl;
+import org.apache.tajo.master.container.TajoConverterUtils;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NullCallback;
 import org.jboss.netty.channel.ConnectTimeoutException;
@@ -53,7 +53,7 @@ public class TaskRunner extends AbstractService {
   private volatile boolean stopped = false;
   private Path baseDirPath;
 
-  private ContainerId containerId;
+  private TajoContainerId containerId;
 
   // for Fetcher
   private ExecutorService fetchLauncher;
@@ -77,7 +77,7 @@ public class TaskRunner extends AbstractService {
     this.fetchLauncher = Executors.newFixedThreadPool(
         systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
     try {
-      this.containerId = ConverterUtils.toContainerId(containerId);
+      this.containerId = TajoConverterUtils.toTajoContainerId(containerId);
       this.executionBlockContext = executionBlockContext;
       this.history = executionBlockContext.createTaskRunnerHistory(this);
       this.history.setState(getServiceState());
@@ -91,11 +91,11 @@ public class TaskRunner extends AbstractService {
     return getId(getContext().getExecutionBlockId(), containerId);
   }
 
-  public ContainerId getContainerId(){
+  public TajoContainerId getContainerId(){
     return containerId;
   }
 
-  public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
+  public static String getId(ExecutionBlockId executionBlockId, TajoContainerId containerId) {
     return executionBlockId + "," + containerId;
   }
 
@@ -211,7 +211,7 @@ public class TaskRunner extends AbstractService {
                 LOG.info("Request GetTask: " + getId());
                 GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
                     .setExecutionBlockId(getExecutionBlockId().getProto())
-                    .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+                    .setContainerId(((TajoContainerIdPBImpl) containerId).getProto())
                     .setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
                     .build();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index a8a11c1..364348f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -21,11 +21,11 @@ package org.apache.tajo.worker;
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.QueryUnitAttemptId;
 import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.container.TajoConverterUtils;
 
 import java.util.Collections;
 import java.util.Map;
@@ -39,13 +39,13 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
 public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
 
   private Service.STATE state;
-  private ContainerId containerId;
+  private TajoContainerId containerId;
   private long startTime;
   private long finishTime;
   private ExecutionBlockId executionBlockId;
   private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null;
 
-  public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBlockId) {
+  public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) {
     init();
     this.containerId = containerId;
     this.executionBlockId = executionBlockId;
@@ -53,7 +53,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
 
   public TaskRunnerHistory(TaskRunnerHistoryProto proto) {
     this.state = Service.STATE.valueOf(proto.getState());
-    this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+    this.containerId = TajoConverterUtils.toTajoContainerId(proto.getContainerId());
     this.startTime = proto.getStartTime();
     this.finishTime = proto.getFinishTime();
     this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId());
@@ -129,11 +129,11 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
     this.state = state;
   }
 
-  public ContainerId getContainerId() {
+  public TajoContainerId getContainerId() {
     return containerId;
   }
 
-  public void setContainerId(ContainerId containerId) {
+  public void setContainerId(TajoContainerId containerId) {
     this.containerId = containerId;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/ContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ContainerProtocol.proto b/tajo-core/src/main/proto/ContainerProtocol.proto
new file mode 100644
index 0000000..df7a450
--- /dev/null
+++ b/tajo-core/src/main/proto/ContainerProtocol.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are public and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "ContainerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+package hadoop.yarn;
+
+import "Security.proto";
+import "yarn_protos.proto";
+
+message TajoContainerIdProto {
+  optional ApplicationIdProto app_id = 1;
+  optional ApplicationAttemptIdProto app_attempt_id = 2;
+  optional int32 id = 3;
+}
+
+message TajoContainerProto {
+  optional TajoContainerIdProto id = 1;
+  optional NodeIdProto nodeId = 2;
+  optional string node_http_address = 3;
+  optional ResourceProto resource = 4;
+  optional PriorityProto priority = 5;
+  optional hadoop.common.TokenProto container_token = 6;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 06d2a42..494d296 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -27,6 +27,9 @@ import "TajoIdProtos.proto";
 import "CatalogProtos.proto";
 import "PrimitiveProtos.proto";
 import "TajoWorkerProtocol.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
 
 service QueryMasterProtocolService {
   //from Worker

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b117cac..b2db46a 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -23,8 +23,11 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 
 import "TajoMasterProtocol.proto";
+import "ContainerProtocol.proto";
 import "tajo_protos.proto";
 
+package hadoop.yarn;
+
 message NodeHeartbeat {
   required WorkerConnectionInfoProto connectionInfo = 1;
   optional ServerStatusProto serverStatus = 2;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index 7283543..e5eab4f 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -28,6 +28,9 @@ import "tajo_protos.proto";
 import "TajoIdProtos.proto";
 import "CatalogProtos.proto";
 import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
 
 message ServerStatusProto {
     message System {
@@ -119,11 +122,11 @@ message WorkerResourcesRequest {
 
 message WorkerResourceReleaseRequest {
     required ExecutionBlockIdProto executionBlockId = 1;
-    repeated hadoop.yarn.ContainerIdProto containerIds = 2;
+    repeated TajoContainerIdProto containerIds = 2;
 }
 
 message WorkerAllocatedResource {
-    required hadoop.yarn.ContainerIdProto containerId = 1;
+    required TajoContainerIdProto containerId = 1;
     required WorkerConnectionInfoProto connectionInfo = 2;
 
     required int32 allocatedMemoryMB = 3;

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index e515438..989b0e3 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -28,6 +28,9 @@ import "TajoIdProtos.proto";
 import "CatalogProtos.proto";
 import "PrimitiveProtos.proto";
 import "Plan.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
 
 message SessionProto {
   required string session_id = 1;
@@ -170,7 +173,7 @@ message QueryExecutionRequestProto {
 
 message GetTaskRequestProto {
     required int32 workerId = 1;
-    required hadoop.yarn.ContainerIdProto containerId = 2;
+    required TajoContainerIdProto containerId = 2;
     required ExecutionBlockIdProto executionBlockId = 3;
 }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 0423894..b8fbd67 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
 import org.apache.tajo.ipc.TajoMasterProtocol.*;
 import org.apache.tajo.master.cluster.WorkerConnectionInfo;
 import org.apache.tajo.rpc.NullCallback;
@@ -150,7 +151,8 @@ public class TestTajoResourceManager {
           .build();
 
       final CountDownLatch barrier = new CountDownLatch(1);
-      final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+      final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
+        ArrayList<ContainerProtocol.TajoContainerIdProto>();
 
       RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
 
@@ -190,7 +192,7 @@ public class TestTajoResourceManager {
         containerIds.add(eachResource.getContainerId());
       }
 
-      for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+      for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
         tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
       }
 
@@ -318,7 +320,8 @@ public class TestTajoResourceManager {
           .build();
 
       final CountDownLatch barrier = new CountDownLatch(1);
-      final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+      final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
+        ArrayList<ContainerProtocol.TajoContainerIdProto>();
 
 
       RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@@ -356,7 +359,7 @@ public class TestTajoResourceManager {
 
       assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
 
-      for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+      for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
         tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
       }
 
@@ -399,7 +402,8 @@ public class TestTajoResourceManager {
           .build();
 
       final CountDownLatch barrier = new CountDownLatch(1);
-      final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+      final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
+        ArrayList<ContainerProtocol.TajoContainerIdProto>();
 
 
       RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@@ -431,7 +435,7 @@ public class TestTajoResourceManager {
 
       assertEquals(0, totalUsedDisks, 0);
 
-      for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+      for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
         tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
       }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 87b4197..220eb6c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -567,6 +567,7 @@ public class StorageManager {
 
     for (Path p : inputs) {
       FileSystem fs = p.getFileSystem(conf);
+
       ArrayList<FileStatus> files = Lists.newArrayList();
       if (fs.isFile(p)) {
         files.addAll(Lists.newArrayList(fs.getFileStatus(p)));

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a355a94..bec0daf 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -1,129 +1,138 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
-import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestFileSystems {
-
-  protected byte[] data = null;
-
-  private static String TEST_PATH = "target/test-data/TestFileSystem";
-  private TajoConf conf = null;
-  private StorageManager sm = null;
-  private FileSystem fs = null;
-  Path testDir;
-
-  public TestFileSystems(FileSystem fs) throws IOException {
-    conf = new TajoConf();
-
-    if(fs instanceof S3FileSystem){
-      conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
-      fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
-    }
-    this.fs = fs;
-    sm = StorageManager.getStorageManager(conf);
-    testDir = getTestDir(this.fs, TEST_PATH);
-  }
-
-  public Path getTestDir(FileSystem fs, String dir) throws IOException {
-    Path path = new Path(dir);
-    if(fs.exists(path))
-      fs.delete(path, true);
-
-    fs.mkdirs(path);
-
-    return fs.makeQualified(path);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][] {
-        {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
-    });
-  }
-
-  @Test
-  public void testBlockSplit() throws IOException {
-
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT4);
-    schema.addColumn("name", Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
-    Tuple[] tuples = new Tuple[4];
-    for (int i = 0; i < tuples.length; i++) {
-      tuples[i] = new VTuple(3);
-      tuples[i]
-          .put(new Datum[] { DatumFactory.createInt4(i),
-              DatumFactory.createInt4(i + 32),
-              DatumFactory.createText("name" + i) });
-    }
-
-    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
-        "table.csv");
-    fs.mkdirs(path.getParent());
-
-    Appender appender = sm.getAppender(meta, schema, path);
-    appender.init();
-    for (Tuple t : tuples) {
-      appender.addTuple(t);
-    }
-    appender.close();
-    FileStatus fileStatus = fs.getFileStatus(path);
-
-    List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
-    int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
-    assertEquals(splitSize, splits.size());
-
-    for (FileFragment fragment : splits) {
-      assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
-    }
-  }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.??See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.??The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.??You may obtain a copy of the License at
+ *
+ *?????http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestFileSystems {
+
+  private static String TEST_PATH = "target/test-data/TestFileSystem";
+  private Configuration conf;
+  private StorageManager sm;
+  private FileSystem fs;
+  private Path testDir;
+
+  public TestFileSystems(FileSystem fs) throws IOException {
+    this.fs = fs;
+    this.conf = fs.getConf();
+    this.testDir = getTestDir(this.fs, TEST_PATH);
+    this.sm = StorageManager.getStorageManager(new TajoConf(this.conf));
+  }
+
+  public Path getTestDir(FileSystem fs, String dir) throws IOException {
+    Path path = new Path(dir);
+    if (fs.exists(path))
+      fs.delete(path, true);
+
+    fs.mkdirs(path);
+
+    return fs.makeQualified(path);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() throws IOException {
+    return Arrays.asList(new Object[][]{
+        {FileSystem.getLocal(new TajoConf())},
+    });
+  }
+
+  @Before
+  public void setup() throws IOException {
+    if (!(fs instanceof LocalFileSystem)) {
+      conf.set("fs.local.block.size", "10");
+      fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
+      fs.setConf(conf);
+    }
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (!(fs instanceof LocalFileSystem)) {
+      fs.setConf(new TajoConf());
+    }
+  }
+
+  @Test
+  public void testBlockSplit() throws IOException {
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT4);
+    schema.addColumn("name", Type.TEXT);
+
+    TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+    Tuple[] tuples = new Tuple[4];
+    for (int i = 0; i < tuples.length; i++) {
+      tuples[i] = new VTuple(3);
+      tuples[i]
+          .put(new Datum[]{DatumFactory.createInt4(i),
+              DatumFactory.createInt4(i + 32),
+              DatumFactory.createText("name" + i)});
+    }
+
+    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
+        "table.csv");
+    fs.mkdirs(path.getParent());
+
+    Appender appender = sm.getAppender(meta, schema, path);
+    appender.init();
+    for (Tuple t : tuples) {
+      appender.addTuple(t);
+    }
+    appender.close();
+    FileStatus fileStatus = fs.getFileStatus(path);
+
+    List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
+    int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
+    assertEquals(splitSize, splits.size());
+
+    for (FileFragment fragment : splits) {
+      assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
deleted file mode 100644
index 7b09937..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.io.IOUtils;
-
-import java.io.*;
-
-/**
- * Holds file metadata including type (regular file, or directory),
- * and the list of blocks that are pointers to the data.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class INode {
-
-  enum FileType {
-    DIRECTORY, FILE
-  }
-
-  public static final FileType[] FILE_TYPES = {
-      FileType.DIRECTORY,
-      FileType.FILE
-  };
-
-  public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
-
-  private FileType fileType;
-  private Block[] blocks;
-
-  public INode(FileType fileType, Block[] blocks) {
-    this.fileType = fileType;
-    if (isDirectory() && blocks != null) {
-      throw new IllegalArgumentException("A directory cannot contain blocks.");
-    }
-    this.blocks = blocks;
-  }
-
-  public Block[] getBlocks() {
-    return blocks;
-  }
-
-  public FileType getFileType() {
-    return fileType;
-  }
-
-  public boolean isDirectory() {
-    return fileType == FileType.DIRECTORY;
-  }
-
-  public boolean isFile() {
-    return fileType == FileType.FILE;
-  }
-
-  public long getSerializedLength() {
-    return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
-  }
-
-
-  public InputStream serialize() throws IOException {
-    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(bytes);
-    try {
-      out.writeByte(fileType.ordinal());
-      if (isFile()) {
-        out.writeInt(blocks.length);
-        for (int i = 0; i < blocks.length; i++) {
-          out.writeLong(blocks[i].getId());
-          out.writeLong(blocks[i].getLength());
-        }
-      }
-      out.close();
-      out = null;
-    } finally {
-      IOUtils.closeStream(out);
-    }
-    return new ByteArrayInputStream(bytes.toByteArray());
-  }
-
-  public static INode deserialize(InputStream in) throws IOException {
-    if (in == null) {
-      return null;
-    }
-    DataInputStream dataIn = new DataInputStream(in);
-    FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
-    switch (fileType) {
-      case DIRECTORY:
-        in.close();
-        return INode.DIRECTORY_INODE;
-      case FILE:
-        int numBlocks = dataIn.readInt();
-        Block[] blocks = new Block[numBlocks];
-        for (int i = 0; i < numBlocks; i++) {
-          long id = dataIn.readLong();
-          long length = dataIn.readLong();
-          blocks[i] = new Block(id, length);
-        }
-        in.close();
-        return new INode(fileType, blocks);
-      default:
-        throw new IllegalArgumentException("Cannot deserialize inode.");
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
deleted file mode 100644
index 40decc2..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.tajo.common.exception.NotImplementedException;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-/**
- * A stub implementation of {@link FileSystemStore} for testing
- * {@link S3FileSystem} without actually connecting to S3.
- */
-public class InMemoryFileSystemStore implements FileSystemStore {
-
-  private Configuration conf;
-  private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
-  private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
-
-  @Override
-  public void initialize(URI uri, Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public String getVersion() throws IOException {
-    return "0";
-  }
-
-  @Override
-  public void deleteINode(Path path) throws IOException {
-    inodes.remove(normalize(path));
-  }
-
-  @Override
-  public void deleteBlock(Block block) throws IOException {
-    blocks.remove(block.getId());
-  }
-
-  @Override
-  public boolean inodeExists(Path path) throws IOException {
-    return inodes.containsKey(normalize(path));
-  }
-
-  @Override
-  public boolean blockExists(long blockId) throws IOException {
-    return blocks.containsKey(blockId);
-  }
-
-  @Override
-  public INode retrieveINode(Path path) throws IOException {
-    return inodes.get(normalize(path));
-  }
-
-  @Override
-  public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
-    byte[] data = blocks.get(block.getId());
-    File file = createTempFile();
-    BufferedOutputStream out = null;
-    try {
-      out = new BufferedOutputStream(new FileOutputStream(file));
-      out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
-    } finally {
-      if (out != null) {
-        out.close();
-      }
-    }
-    return file;
-  }
-
-  private File createTempFile() throws IOException {
-    File dir = new File(conf.get("fs.s3.buffer.dir"));
-    if (!dir.exists() && !dir.mkdirs()) {
-      throw new IOException("Cannot create S3 buffer directory: " + dir);
-    }
-    File result = File.createTempFile("test-", ".tmp", dir);
-    result.deleteOnExit();
-    return result;
-  }
-
-  @Override
-  public Set<Path> listSubPaths(Path path) throws IOException {
-    Path normalizedPath = normalize(path);
-    // This is inefficient but more than adequate for testing purposes.
-    Set<Path> subPaths = new LinkedHashSet<Path>();
-    for (Path p : inodes.tailMap(normalizedPath).keySet()) {
-      if (normalizedPath.equals(p.getParent())) {
-        subPaths.add(p);
-      }
-    }
-    return subPaths;
-  }
-
-  @Override
-  public Set<Path> listDeepSubPaths(Path path) throws IOException {
-    Path normalizedPath = normalize(path);
-    String pathString = normalizedPath.toUri().getPath();
-    if (!pathString.endsWith("/")) {
-      pathString += "/";
-    }
-    // This is inefficient but more than adequate for testing purposes.
-    Set<Path> subPaths = new LinkedHashSet<Path>();
-    for (Path p : inodes.tailMap(normalizedPath).keySet()) {
-      if (p.toUri().getPath().startsWith(pathString)) {
-        subPaths.add(p);
-      }
-    }
-    return subPaths;
-  }
-
-  @Override
-  public void storeINode(Path path, INode inode) throws IOException {
-    inodes.put(normalize(path), inode);
-  }
-
-  @Override
-  public void storeBlock(Block block, File file) throws IOException {
-    ByteArrayOutputStream out = new ByteArrayOutputStream();
-    byte[] buf = new byte[8192];
-    int numRead;
-    BufferedInputStream in = null;
-    try {
-      in = new BufferedInputStream(new FileInputStream(file));
-      while ((numRead = in.read(buf)) >= 0) {
-        out.write(buf, 0, numRead);
-      }
-    } finally {
-      if (in != null) {
-        in.close();
-      }
-    }
-    blocks.put(block.getId(), out.toByteArray());
-  }
-
-  private Path normalize(Path path) {
-    if (!path.isAbsolute()) {
-      throw new IllegalArgumentException("Path must be absolute: " + path);
-    }
-    return new Path(path.toUri().getPath());
-  }
-
-  @Override
-  public void purge() throws IOException {
-    inodes.clear();
-    blocks.clear();
-  }
-
-  @Override
-  public void dump() throws IOException {
-    throw new NotImplementedException();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
deleted file mode 100644
index d4034b9..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.hadoop.util.Progressable;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-class S3OutputStream extends OutputStream {
-
-  private Configuration conf;
-
-  private int bufferSize;
-
-  private FileSystemStore store;
-
-  private Path path;
-
-  private long blockSize;
-
-  private File backupFile;
-
-  private OutputStream backupStream;
-
-  private Random r = new Random();
-
-  private boolean closed;
-
-  private int pos = 0;
-
-  private long filePos = 0;
-
-  private int bytesWrittenToBlock = 0;
-
-  private byte[] outBuf;
-
-  private List<Block> blocks = new ArrayList<Block>();
-
-  private Block nextBlock;
-
-  private static final Log LOG =
-      LogFactory.getLog(S3OutputStream.class.getName());
-
-
-  public S3OutputStream(Configuration conf, FileSystemStore store,
-                        Path path, long blockSize, Progressable progress,
-                        int buffersize) throws IOException {
-
-    this.conf = conf;
-    this.store = store;
-    this.path = path;
-    this.blockSize = blockSize;
-    this.backupFile = newBackupFile();
-    this.backupStream = new FileOutputStream(backupFile);
-    this.bufferSize = buffersize;
-    this.outBuf = new byte[bufferSize];
-
-  }
-
-  private File newBackupFile() throws IOException {
-    File dir = new File(conf.get("fs.s3.buffer.dir"));
-    if (!dir.exists() && !dir.mkdirs()) {
-      throw new IOException("Cannot create S3 buffer directory: " + dir);
-    }
-    File result = File.createTempFile("output-", ".tmp", dir);
-    result.deleteOnExit();
-    return result;
-  }
-
-  public long getPos() throws IOException {
-    return filePos;
-  }
-
-  @Override
-  public synchronized void write(int b) throws IOException {
-    if (closed) {
-      throw new IOException("Stream closed");
-    }
-
-    if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
-      flush();
-    }
-    outBuf[pos++] = (byte) b;
-    filePos++;
-  }
-
-  @Override
-  public synchronized void write(byte b[], int off, int len) throws IOException {
-    if (closed) {
-      throw new IOException("Stream closed");
-    }
-    while (len > 0) {
-      int remaining = bufferSize - pos;
-      int toWrite = Math.min(remaining, len);
-      System.arraycopy(b, off, outBuf, pos, toWrite);
-      pos += toWrite;
-      off += toWrite;
-      len -= toWrite;
-      filePos += toWrite;
-
-      if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
-        flush();
-      }
-    }
-  }
-
-  @Override
-  public synchronized void flush() throws IOException {
-    if (closed) {
-      throw new IOException("Stream closed");
-    }
-
-    if (bytesWrittenToBlock + pos >= blockSize) {
-      flushData((int) blockSize - bytesWrittenToBlock);
-    }
-    if (bytesWrittenToBlock == blockSize) {
-      endBlock();
-    }
-    flushData(pos);
-  }
-
-  private synchronized void flushData(int maxPos) throws IOException {
-    int workingPos = Math.min(pos, maxPos);
-
-    if (workingPos > 0) {
-      //
-      // To the local block backup, write just the bytes
-      //
-      backupStream.write(outBuf, 0, workingPos);
-
-      //
-      // Track position
-      //
-      bytesWrittenToBlock += workingPos;
-      System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
-      pos -= workingPos;
-    }
-  }
-
-  private synchronized void endBlock() throws IOException {
-    //
-    // Done with local copy
-    //
-    backupStream.close();
-
-    //
-    // Send it to S3
-    //
-    // TODO: Use passed in Progressable to report progress.
-    nextBlockOutputStream();
-    store.storeBlock(nextBlock, backupFile);
-    Block[] arr = new Block[blocks.size()];
-    arr = blocks.toArray(arr);
-    store.storeINode(path, new INode(INode.FILE_TYPES[1], arr));
-
-    //
-    // Delete local backup, start new one
-    //
-    boolean b = backupFile.delete();
-    if (!b) {
-      LOG.warn("Ignoring failed delete");
-    }
-    backupFile = newBackupFile();
-    backupStream = new FileOutputStream(backupFile);
-    bytesWrittenToBlock = 0;
-  }
-
-  private synchronized void nextBlockOutputStream() throws IOException {
-    long blockId = r.nextLong();
-    while (store.blockExists(blockId)) {
-      blockId = r.nextLong();
-    }
-    nextBlock = new Block(blockId, bytesWrittenToBlock);
-    blocks.add(nextBlock);
-    bytesWrittenToBlock = 0;
-  }
-
-
-  @Override
-  public synchronized void close() throws IOException {
-    if (closed) {
-      return;
-    }
-
-    flush();
-    if (filePos == 0 || bytesWrittenToBlock != 0) {
-      endBlock();
-    }
-
-    backupStream.close();
-    boolean b = backupFile.delete();
-    if (!b) {
-      LOG.warn("Ignoring failed delete");
-    }
-
-    super.close();
-
-    closed = true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
deleted file mode 100644
index fc1c908..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.util.Progressable;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SmallBlockS3FileSystem extends S3FileSystem {
-
-  private URI uri;
-
-  private FileSystemStore store;
-
-  private Path workingDir;
-
-  static class Holder {
-    private static InMemoryFileSystemStore s;
-
-    public synchronized static FileSystemStore get() {
-      if(s != null) {
-        return s;
-      }
-      s = new InMemoryFileSystemStore();
-      return s;
-    }
-
-    public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore) {
-      s = inMemoryFileSystemStore;
-    }
-  }
-
-  public SmallBlockS3FileSystem() {
-  }
-
-
-  public SmallBlockS3FileSystem(
-      InMemoryFileSystemStore inMemoryFileSystemStore) {
-    Holder.set(inMemoryFileSystemStore);
-    this.store = inMemoryFileSystemStore;
-  }
-
-  @Override
-  public URI getUri() {
-    return uri;
-  }
-  @Override
-  public long getDefaultBlockSize() {
-    return 10;
-  }
-
-  @Override
-  public void initialize(URI uri, Configuration conf) throws IOException {
-    if (store == null) {
-      store = Holder.get();
-    }
-    store.initialize(uri, conf);
-    setConf(conf);
-    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.workingDir =
-        new Path("/user", System.getProperty("user.name")).makeQualified(this);
-  }
-  @Override
-  public boolean isFile(Path path) throws IOException {
-    INode inode = store.retrieveINode(makeAbsolute(path));
-    if (inode == null) {
-      return false;
-    }
-    return inode.isFile();
-  }
-
-  private INode checkFile(Path path) throws IOException {
-    INode inode = store.retrieveINode(makeAbsolute(path));
-    if (inode == null) {
-      throw new IOException("No such file.");
-    }
-    if (inode.isDirectory()) {
-      throw new IOException("Path " + path + " is a directory.");
-    }
-    return inode;
-  }
-
-  @Override
-  public FileStatus[] listStatus(Path f) throws IOException {
-    Path absolutePath = makeAbsolute(f);
-    INode inode = store.retrieveINode(absolutePath);
-    if (inode == null) {
-      throw new FileNotFoundException("File " + f + " does not exist.");
-    }
-    if (inode.isFile()) {
-      return new FileStatus[] {
-          new S3FileStatus(f.makeQualified(this), inode)
-      };
-    }
-    ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
-    for (Path p : store.listSubPaths(absolutePath)) {
-      ret.add(getFileStatus(p.makeQualified(this)));
-    }
-    return ret.toArray(new FileStatus[0]);
-  }
-  @Override
-  public FSDataOutputStream create(Path file, FsPermission permission,
-                                   boolean overwrite, int bufferSize,
-                                   short replication, long blockSize, Progressable progress)
-      throws IOException {
-
-    INode inode = store.retrieveINode(makeAbsolute(file));
-    if (inode != null) {
-      if (overwrite) {
-        delete(file, true);
-      } else {
-        throw new IOException("File already exists: " + file);
-      }
-    } else {
-      Path parent = file.getParent();
-      if (parent != null) {
-        if (!mkdirs(parent)) {
-          throw new IOException("Mkdirs failed to create " + parent.toString());
-        }
-      }
-    }
-    return new FSDataOutputStream
-        (new S3OutputStream(getConf(), store, makeAbsolute(file),
-            blockSize, progress, bufferSize),
-            statistics);
-  }
-  @Override
-  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
-    Path absolutePath = makeAbsolute(path);
-    List<Path> paths = new ArrayList<Path>();
-    do {
-      paths.add(0, absolutePath);
-      absolutePath = absolutePath.getParent();
-    } while (absolutePath != null);
-
-    boolean result = true;
-    for (Path p : paths) {
-      result &= mkdir(p);
-    }
-    return result;
-  }
-
-  @Override
-  public Path getWorkingDirectory() {
-    return workingDir;
-  }
-
-  @Override
-  public boolean rename(Path src, Path dst) throws IOException {
-    Path absoluteSrc = makeAbsolute(src);
-    INode srcINode = store.retrieveINode(absoluteSrc);
-    if (srcINode == null) {
-      // src path doesn't exist
-      return false;
-    }
-    Path absoluteDst = makeAbsolute(dst);
-    INode dstINode = store.retrieveINode(absoluteDst);
-    if (dstINode != null && dstINode.isDirectory()) {
-      absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
-      dstINode = store.retrieveINode(absoluteDst);
-    }
-    if (dstINode != null) {
-      // dst path already exists - can't overwrite
-      return false;
-    }
-    Path dstParent = absoluteDst.getParent();
-    if (dstParent != null) {
-      INode dstParentINode = store.retrieveINode(dstParent);
-      if (dstParentINode == null || dstParentINode.isFile()) {
-        // dst parent doesn't exist or is a file
-        return false;
-      }
-    }
-    return renameRecursive(absoluteSrc, absoluteDst);
-  }
-
-  private boolean renameRecursive(Path src, Path dst) throws IOException {
-    INode srcINode = store.retrieveINode(src);
-    store.storeINode(dst, srcINode);
-    store.deleteINode(src);
-    if (srcINode.isDirectory()) {
-      for (Path oldSrc : store.listDeepSubPaths(src)) {
-        INode inode = store.retrieveINode(oldSrc);
-        if (inode == null) {
-          return false;
-        }
-        String oldSrcPath = oldSrc.toUri().getPath();
-        String srcPath = src.toUri().getPath();
-        String dstPath = dst.toUri().getPath();
-        Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
-        store.storeINode(newDst, inode);
-        store.deleteINode(oldSrc);
-      }
-    }
-    return true;
-  }
-
-  @Override
-  public boolean delete(Path path, boolean recursive) throws IOException {
-    Path absolutePath = makeAbsolute(path);
-    INode inode = store.retrieveINode(absolutePath);
-    if (inode == null) {
-      return false;
-    }
-    if (inode.isFile()) {
-      store.deleteINode(absolutePath);
-      for (Block block: inode.getBlocks()) {
-        store.deleteBlock(block);
-      }
-    } else {
-      FileStatus[] contents = null;
-      try {
-        contents = listStatus(absolutePath);
-      } catch(FileNotFoundException fnfe) {
-        return false;
-      }
-
-      if ((contents.length !=0) && (!recursive)) {
-        throw new IOException("Directory " + path.toString()
-            + " is not empty.");
-      }
-      for (FileStatus p:contents) {
-        if (!delete(p.getPath(), recursive)) {
-          return false;
-        }
-      }
-      store.deleteINode(absolutePath);
-    }
-    return true;
-  }
-
-  /**
-   * FileStatus for S3 file systems.
-   */
-  @Override
-  public FileStatus getFileStatus(Path f)  throws IOException {
-    INode inode = store.retrieveINode(makeAbsolute(f));
-    if (inode == null) {
-      throw new FileNotFoundException(f + ": No such file or directory.");
-    }
-    return new S3FileStatus(f.makeQualified(this), inode);
-  }
-  private boolean mkdir(Path path) throws IOException {
-    Path absolutePath = makeAbsolute(path);
-    INode inode = store.retrieveINode(absolutePath);
-    if (inode == null) {
-      store.storeINode(absolutePath, INode.DIRECTORY_INODE);
-    } else if (inode.isFile()) {
-      throw new IOException(String.format(
-          "Can't make directory for path %s since it is a file.",
-          absolutePath));
-    }
-    return true;
-  }
-  private Path makeAbsolute(Path path) {
-    if (path.isAbsolute()) {
-      return path;
-    }
-    return new Path(workingDir, path);
-  }
-
-  private static class S3FileStatus extends FileStatus {
-
-    S3FileStatus(Path f, INode inode) throws IOException {
-      super(findLength(inode), inode.isDirectory(), 1,
-          findBlocksize(inode), 0, f);
-    }
-
-    private static long findLength(INode inode) {
-      if (!inode.isDirectory()) {
-        long length = 0L;
-        for (Block block : inode.getBlocks()) {
-          length += block.getLength();
-        }
-        return length;
-      }
-      return 0;
-    }
-
-    private static long findBlocksize(INode inode) {
-      final Block[] ret = inode.getBlocks();
-      return ret == null ? 0L : ret[0].getLength();
-    }
-  }
-}
\ No newline at end of file