You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/02 04:56:37 UTC
[04/14] tajo git commit: TAJO-1166: S3 related storage causes
compilation error in Hadoop 2.6.0-SNAPSHOT. (jaehwa)
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
index 0e3ccad..0cc87fc 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tajo.QueryId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -48,7 +49,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import static org.apache.tajo.ipc.TajoMasterProtocol.*;
@@ -80,7 +80,8 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
private TajoConf systemConf;
- private ConcurrentMap<ContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps.newConcurrentMap();
+ private ConcurrentMap<ContainerProtocol.TajoContainerIdProto, AllocatedWorkerResource> allocatedResourceMap = Maps
+ .newConcurrentMap();
/** It receives status messages from workers and their resources. */
private TajoResourceTracker resourceTracker;
@@ -194,7 +195,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) {
float queryMasterDefaultDiskSlot = masterContext.getConf().getFloatVar(
- TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
+ TajoConf.ConfVars.TAJO_QUERYMASTER_DISK_SLOT);
int queryMasterDefaultMemoryMB = masterContext.getConf().getIntVar(TajoConf.ConfVars.TAJO_QUERYMASTER_MEMORY_MB);
WorkerResourceAllocationRequest.Builder builder = WorkerResourceAllocationRequest.newBuilder();
@@ -235,7 +236,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
return resource;
}
- private void registerQueryMaster(QueryId queryId, ContainerIdProto containerId) {
+ private void registerQueryMaster(QueryId queryId, ContainerProtocol.TajoContainerIdProto containerId) {
rmContext.getQueryMasterContainer().putIfAbsent(queryId, containerId);
}
@@ -256,9 +257,9 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
WorkerResourceAllocationRequest request;
RpcCallback<WorkerResourceAllocationResponse> callBack;
WorkerResourceRequest(
- QueryId queryId,
- boolean queryMasterRequest, WorkerResourceAllocationRequest request,
- RpcCallback<WorkerResourceAllocationResponse> callBack) {
+ QueryId queryId,
+ boolean queryMasterRequest, WorkerResourceAllocationRequest request,
+ RpcCallback<WorkerResourceAllocationResponse> callBack) {
this.queryId = queryId;
this.queryMasterRequest = queryMasterRequest;
this.request = request;
@@ -282,14 +283,14 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
if (LOG.isDebugEnabled()) {
LOG.debug("allocateWorkerResources:" +
- (new QueryId(resourceRequest.request.getQueryId())) +
- ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
- "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
- ", requiredContainers:" + resourceRequest.request.getNumContainers() +
- ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
- "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
- ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
- ", liveWorkers=" + rmContext.getWorkers().size());
+ (new QueryId(resourceRequest.request.getQueryId())) +
+ ", requiredMemory:" + resourceRequest.request.getMinMemoryMBPerContainer() +
+ "~" + resourceRequest.request.getMaxMemoryMBPerContainer() +
+ ", requiredContainers:" + resourceRequest.request.getNumContainers() +
+ ", requiredDiskSlots:" + resourceRequest.request.getMinDiskSlotPerContainer() +
+ "~" + resourceRequest.request.getMaxDiskSlotPerContainer() +
+ ", queryMasterRequest=" + resourceRequest.queryMasterRequest +
+ ", liveWorkers=" + rmContext.getWorkers().size());
}
// TajoWorkerResourceManager can't return allocated disk slots occasionally.
@@ -300,25 +301,25 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
if(allocatedWorkerResources.size() > 0) {
List<WorkerAllocatedResource> allocatedResources =
- new ArrayList<WorkerAllocatedResource>();
+ new ArrayList<WorkerAllocatedResource>();
for(AllocatedWorkerResource allocatedResource: allocatedWorkerResources) {
NodeId nodeId = NodeId.newInstance(allocatedResource.worker.getConnectionInfo().getHost(),
- allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
+ allocatedResource.worker.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
containerId.setApplicationAttemptId(
- ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
+ ApplicationIdUtils.createApplicationAttemptId(resourceRequest.queryId));
containerId.setId(containerIdSeq.incrementAndGet());
- ContainerIdProto containerIdProto = containerId.getProto();
+ ContainerProtocol.TajoContainerIdProto containerIdProto = containerId.getProto();
allocatedResources.add(WorkerAllocatedResource.newBuilder()
- .setContainerId(containerIdProto)
- .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
- .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
- .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
- .build());
+ .setContainerId(containerIdProto)
+ .setConnectionInfo(allocatedResource.worker.getConnectionInfo().getProto())
+ .setAllocatedMemoryMB(allocatedResource.allocatedMemoryMB)
+ .setAllocatedDiskSlots(allocatedResource.allocatedDiskSlots)
+ .build());
allocatedResourceMap.putIfAbsent(containerIdProto, allocatedResource);
@@ -358,7 +359,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
int allocatedResources = 0;
TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority
- = resourceRequest.request.getResourceRequestPriority();
+ = resourceRequest.request.getResourceRequestPriority();
if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) {
synchronized(rmContext) {
@@ -369,7 +370,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
int minMemoryMB = resourceRequest.request.getMinMemoryMBPerContainer();
int maxMemoryMB = resourceRequest.request.getMaxMemoryMBPerContainer();
float diskSlot = Math.max(resourceRequest.request.getMaxDiskSlotPerContainer(),
- resourceRequest.request.getMinDiskSlotPerContainer());
+ resourceRequest.request.getMinDiskSlotPerContainer());
int liveWorkerSize = randomWorkers.size();
Set<Integer> insufficientWorkers = new HashSet<Integer>();
@@ -418,7 +419,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
}
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
- allocatedWorkerResource.allocatedMemoryMB);
+ allocatedWorkerResource.allocatedMemoryMB);
selectedWorkers.add(allocatedWorkerResource);
@@ -438,7 +439,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
float minDiskSlots = resourceRequest.request.getMinDiskSlotPerContainer();
float maxDiskSlots = resourceRequest.request.getMaxDiskSlotPerContainer();
int memoryMB = Math.max(resourceRequest.request.getMaxMemoryMBPerContainer(),
- resourceRequest.request.getMinMemoryMBPerContainer());
+ resourceRequest.request.getMinMemoryMBPerContainer());
int liveWorkerSize = randomWorkers.size();
Set<Integer> insufficientWorkers = new HashSet<Integer>();
@@ -487,7 +488,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
allocatedWorkerResource.allocatedMemoryMB = workerResource.getAvailableMemoryMB();
}
workerResource.allocateResource(allocatedWorkerResource.allocatedDiskSlots,
- allocatedWorkerResource.allocatedMemoryMB);
+ allocatedWorkerResource.allocatedMemoryMB);
selectedWorkers.add(allocatedWorkerResource);
@@ -508,7 +509,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
* @param containerId ContainerIdProto to be released
*/
@Override
- public void releaseWorkerResource(ContainerIdProto containerId) {
+ public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId) {
AllocatedWorkerResource allocated = allocatedResourceMap.get(containerId);
if(allocated != null) {
LOG.info("Release Resource: " + allocated.allocatedDiskSlots + "," + allocated.allocatedMemoryMB);
@@ -530,7 +531,7 @@ public class TajoWorkerResourceManager extends CompositeService implements Worke
LOG.warn("No QueryMaster resource info for " + queryId);
return;
} else {
- ContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
+ ContainerProtocol.TajoContainerIdProto containerId = rmContext.getQueryMasterContainer().remove(queryId);
releaseWorkerResource(containerId);
rmContext.getStoppedQueryIds().add(queryId);
LOG.info(String.format("Released QueryMaster (%s) resource." , queryId.toString()));
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
index 8e8ac51..9c2b71b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java
@@ -20,8 +20,8 @@ package org.apache.tajo.master.rm;
import com.google.protobuf.RpcCallback;
import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.tajo.QueryId;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -63,7 +63,7 @@ public interface WorkerResourceManager extends Service {
*
* @param containerId ContainerIdProto to be released
*/
- public void releaseWorkerResource(ContainerIdProto containerId);
+ public void releaseWorkerResource(ContainerProtocol.TajoContainerIdProto containerId);
public String getSeedQueryId() throws IOException;
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
index ca71c53..68c57f2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/AbstractResourceAllocator.java
@@ -20,9 +20,9 @@ package org.apache.tajo.worker;
import com.google.common.collect.Maps;
import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.tajo.master.ContainerProxy;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainerId;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -41,29 +41,29 @@ public abstract class AbstractResourceAllocator extends CompositeService impleme
workerInfoMap.putIfAbsent(connectionInfo.getId(), connectionInfo);
}
- private Map<ContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
+ private Map<TajoContainerId, ContainerProxy> containers = Maps.newConcurrentMap();
public AbstractResourceAllocator() {
super(AbstractResourceAllocator.class.getName());
}
- public void addContainer(ContainerId cId, ContainerProxy container) {
+ public void addContainer(TajoContainerId cId, ContainerProxy container) {
containers.put(cId, container);
}
- public void removeContainer(ContainerId cId) {
+ public void removeContainer(TajoContainerId cId) {
containers.remove(cId);
}
- public boolean containsContainer(ContainerId cId) {
+ public boolean containsContainer(TajoContainerId cId) {
return containers.containsKey(cId);
}
- public ContainerProxy getContainer(ContainerId cId) {
+ public ContainerProxy getContainer(TajoContainerId cId) {
return containers.get(cId);
}
- public Map<ContainerId, ContainerProxy> getContainers() {
+ public Map<TajoContainerId, ContainerProxy> getContainers() {
return containers;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
index 8b9219c..b713e70 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/ResourceAllocator.java
@@ -18,12 +18,12 @@
package org.apache.tajo.worker;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.tajo.ipc.ContainerProtocol;
+import org.apache.tajo.master.container.TajoContainerId;
public interface ResourceAllocator {
public void allocateTaskWorker();
- public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerId);
+ public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerId);
public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext,
int numTasks, int memoryMBPerTask);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
index 2220089..9345885 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java
@@ -23,28 +23,25 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
+import org.apache.tajo.master.container.TajoContainer;
+import org.apache.tajo.master.container.TajoContainerId;
import org.apache.tajo.master.event.ContainerAllocationEvent;
import org.apache.tajo.master.event.ContainerAllocatorEventType;
import org.apache.tajo.master.event.SubQueryContainerAllocationEvent;
import org.apache.tajo.master.querymaster.QueryMasterTask;
import org.apache.tajo.master.querymaster.SubQuery;
import org.apache.tajo.master.querymaster.SubQueryState;
-import org.apache.tajo.master.rm.TajoWorkerContainer;
-import org.apache.tajo.master.rm.TajoWorkerContainerId;
-import org.apache.tajo.master.rm.Worker;
-import org.apache.tajo.master.rm.WorkerResource;
+import org.apache.tajo.master.rm.*;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NettyClientBase;
import org.apache.tajo.rpc.NullCallback;
@@ -72,11 +69,11 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
public TajoResourceAllocator(QueryMasterTask.QueryMasterTaskContext queryTaskContext) {
this.queryTaskContext = queryTaskContext;
executorService = Executors.newFixedThreadPool(
- queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
+ queryTaskContext.getConf().getIntVar(TajoConf.ConfVars.YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM));
}
@Override
- public ContainerId makeContainerId(YarnProtos.ContainerIdProto containerIdProto) {
+ public TajoContainerId makeContainerId(ContainerProtocol.TajoContainerIdProto containerIdProto) {
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
ApplicationAttemptId appAttemptId = new ApplicationAttemptIdPBImpl(containerIdProto.getAppAttemptId());
containerId.setApplicationAttemptId(appAttemptId);
@@ -98,7 +95,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask;
clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot
LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks +
- ", Number of Cluster Slots=" + clusterSlots);
+ ", Number of Cluster Slots=" + clusterSlots);
return Math.min(numTasks, clusterSlots);
}
@@ -121,7 +118,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
executorService.shutdownNow();
- Map<ContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator().getContainers();
+ Map<TajoContainerId, ContainerProxy> containers = queryTaskContext.getResourceAllocator()
+ .getContainers();
List<ContainerProxy> list = new ArrayList<ContainerProxy>(containers.values());
for(ContainerProxy eachProxy: list) {
try {
@@ -156,16 +154,17 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
private void launchTaskRunners(LaunchTaskRunnersEvent event) {
// Query in standby mode doesn't need launch Worker.
// But, Assign ExecutionBlock to assigned tajo worker
- for(Container eachContainer: event.getContainers()) {
+ for(TajoContainer eachContainer: event.getContainers()) {
TajoContainerProxy containerProxy = new TajoContainerProxy(queryTaskContext, tajoConf,
- eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
+ eachContainer, event.getQueryContext(), event.getExecutionBlockId(), event.getPlanJson());
executorService.submit(new LaunchRunner(eachContainer.getId(), containerProxy));
}
}
- public void stopExecutionBlock(final ExecutionBlockId executionBlockId, Collection<Container> containers) {
+ public void stopExecutionBlock(final ExecutionBlockId executionBlockId,
+ Collection<TajoContainer> containers) {
Set<NodeId> workers = Sets.newHashSet();
- for (Container container : containers){
+ for (TajoContainer container : containers){
workers.add(container.getNodeId());
}
@@ -196,8 +195,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
protected static class LaunchRunner implements Runnable {
private final ContainerProxy proxy;
- private final ContainerId id;
- public LaunchRunner(ContainerId id, ContainerProxy proxy) {
+ private final TajoContainerId id;
+ public LaunchRunner(TajoContainerId id, ContainerProxy proxy) {
this.proxy = proxy;
this.id = id;
}
@@ -210,8 +209,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
}
- private void stopContainers(Collection<Container> containers) {
- for (Container container : containers) {
+ private void stopContainers(Collection<TajoContainer> containers) {
+ for (TajoContainer container : containers) {
final ContainerProxy proxy = queryTaskContext.getResourceAllocator().getContainer(container.getId());
executorService.submit(new StopContainerRunner(container.getId(), proxy));
}
@@ -219,8 +218,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
private static class StopContainerRunner implements Runnable {
private final ContainerProxy proxy;
- private final ContainerId id;
- public StopContainerRunner(ContainerId id, ContainerProxy proxy) {
+ private final TajoContainerId id;
+ public StopContainerRunner(TajoContainerId id, ContainerProxy proxy) {
this.id = id;
this.proxy = proxy;
}
@@ -251,23 +250,23 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
public void run() {
LOG.info("Start TajoWorkerAllocationThread");
CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse> callBack =
- new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
+ new CallFuture<TajoMasterProtocol.WorkerResourceAllocationResponse>();
//TODO consider task's resource usage pattern
int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY);
float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK);
TajoMasterProtocol.WorkerResourceAllocationRequest request =
- TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
- .setMinMemoryMBPerContainer(requiredMemoryMB)
- .setMaxMemoryMBPerContainer(requiredMemoryMB)
- .setNumContainers(event.getRequiredNum())
- .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
- : TajoMasterProtocol.ResourceRequestPriority.DISK)
- .setMinDiskSlotPerContainer(requiredDiskSlots)
- .setMaxDiskSlotPerContainer(requiredDiskSlots)
- .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
- .build();
+ TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder()
+ .setMinMemoryMBPerContainer(requiredMemoryMB)
+ .setMaxMemoryMBPerContainer(requiredMemoryMB)
+ .setNumContainers(event.getRequiredNum())
+ .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY
+ : TajoMasterProtocol.ResourceRequestPriority.DISK)
+ .setMinDiskSlotPerContainer(requiredDiskSlots)
+ .setMaxDiskSlotPerContainer(requiredDiskSlots)
+ .setQueryId(event.getExecutionBlockId().getQueryId().getProto())
+ .build();
RpcConnectionPool connPool = RpcConnectionPool.getPool(queryTaskContext.getConf());
NettyClientBase tmClient = null;
@@ -280,21 +279,21 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
if (tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
try {
tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
} catch (Exception e) {
queryTaskContext.getQueryMasterContext().getWorkerContext().
- setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
+ setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf));
queryTaskContext.getQueryMasterContext().getWorkerContext().
- setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
+ setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf));
tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
}
} else {
tmClient = connPool.getConnection(
- queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
- TajoMasterProtocol.class, true);
+ queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(),
+ TajoMasterProtocol.class, true);
}
TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub();
@@ -325,17 +324,17 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
List<TajoMasterProtocol.WorkerAllocatedResource> allocatedResources = response.getWorkerAllocatedResourceList();
ExecutionBlockId executionBlockId = event.getExecutionBlockId();
- List<Container> containers = new ArrayList<Container>();
+ List<TajoContainer> containers = new ArrayList<TajoContainer>();
for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) {
TajoWorkerContainer container = new TajoWorkerContainer();
NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(),
- eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
+ eachAllocatedResource.getConnectionInfo().getPeerRpcPort());
TajoWorkerContainerId containerId = new TajoWorkerContainerId();
containerId.setApplicationAttemptId(
- ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
- eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
+ ApplicationIdUtils.createApplicationAttemptId(executionBlockId.getQueryId(),
+ eachAllocatedResource.getContainerId().getAppAttemptId().getAttemptId()));
containerId.setId(eachAllocatedResource.getContainerId().getId());
container.setId(containerId);
@@ -347,7 +346,7 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
workerResource.setDiskSlots(eachAllocatedResource.getAllocatedDiskSlots());
Worker worker = new Worker(null, workerResource,
- new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
+ new WorkerConnectionInfo(eachAllocatedResource.getConnectionInfo()));
container.setWorkerResource(worker);
addWorkerConnectionInfo(worker.getConnectionInfo());
containers.add(container);
@@ -356,8 +355,8 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState();
if (!SubQuery.isRunningState(state)) {
try {
- List<ContainerId> containerIds = new ArrayList<ContainerId>();
- for(Container eachContainer: containers) {
+ List<TajoContainerId> containerIds = new ArrayList<TajoContainerId>();
+ for(TajoContainer eachContainer: containers) {
containerIds.add(eachContainer.getId());
}
TajoContainerProxy.releaseWorkerResource(queryTaskContext, executionBlockId, containerIds);
@@ -378,10 +377,10 @@ public class TajoResourceAllocator extends AbstractResourceAllocator {
}
if(event.getRequiredNum() > numAllocatedContainers) {
ContainerAllocationEvent shortRequestEvent = new ContainerAllocationEvent(
- event.getType(), event.getExecutionBlockId(), event.getPriority(),
- event.getResource(),
- event.getRequiredNum() - numAllocatedContainers,
- event.isLeafQuery(), event.getProgress()
+ event.getType(), event.getExecutionBlockId(), event.getPriority(),
+ event.getResource(),
+ event.getRequiredNum() - numAllocatedContainers,
+ event.isLeafQuery(), event.getProgress()
);
queryTaskContext.getEventHandler().handle(shortRequestEvent);
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
index 1910575..4e9860b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java
@@ -24,15 +24,15 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.engine.query.QueryUnitRequestImpl;
import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.container.impl.pb.TajoContainerIdPBImpl;
+import org.apache.tajo.master.container.TajoConverterUtils;
import org.apache.tajo.rpc.CallFuture;
import org.apache.tajo.rpc.NullCallback;
import org.jboss.netty.channel.ConnectTimeoutException;
@@ -53,7 +53,7 @@ public class TaskRunner extends AbstractService {
private volatile boolean stopped = false;
private Path baseDirPath;
- private ContainerId containerId;
+ private TajoContainerId containerId;
// for Fetcher
private ExecutorService fetchLauncher;
@@ -77,7 +77,7 @@ public class TaskRunner extends AbstractService {
this.fetchLauncher = Executors.newFixedThreadPool(
systemConf.getIntVar(ConfVars.SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM), fetcherFactory);
try {
- this.containerId = ConverterUtils.toContainerId(containerId);
+ this.containerId = TajoConverterUtils.toTajoContainerId(containerId);
this.executionBlockContext = executionBlockContext;
this.history = executionBlockContext.createTaskRunnerHistory(this);
this.history.setState(getServiceState());
@@ -91,11 +91,11 @@ public class TaskRunner extends AbstractService {
return getId(getContext().getExecutionBlockId(), containerId);
}
- public ContainerId getContainerId(){
+ public TajoContainerId getContainerId(){
return containerId;
}
- public static String getId(ExecutionBlockId executionBlockId, ContainerId containerId) {
+ public static String getId(ExecutionBlockId executionBlockId, TajoContainerId containerId) {
return executionBlockId + "," + containerId;
}
@@ -211,7 +211,7 @@ public class TaskRunner extends AbstractService {
LOG.info("Request GetTask: " + getId());
GetTaskRequestProto request = GetTaskRequestProto.newBuilder()
.setExecutionBlockId(getExecutionBlockId().getProto())
- .setContainerId(((ContainerIdPBImpl) containerId).getProto())
+ .setContainerId(((TajoContainerIdPBImpl) containerId).getProto())
.setWorkerId(getContext().getWorkerContext().getConnectionInfo().getId())
.build();
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
index a8a11c1..364348f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerHistory.java
@@ -21,11 +21,11 @@ package org.apache.tajo.worker;
import com.google.common.base.Objects;
import com.google.common.collect.Maps;
import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tajo.ExecutionBlockId;
import org.apache.tajo.QueryUnitAttemptId;
import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.master.container.TajoContainerId;
+import org.apache.tajo.master.container.TajoConverterUtils;
import java.util.Collections;
import java.util.Map;
@@ -39,13 +39,13 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.TaskRunnerHistoryProto;
public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
private Service.STATE state;
- private ContainerId containerId;
+ private TajoContainerId containerId;
private long startTime;
private long finishTime;
private ExecutionBlockId executionBlockId;
private Map<QueryUnitAttemptId, TaskHistory> taskHistoryMap = null;
- public TaskRunnerHistory(ContainerId containerId, ExecutionBlockId executionBlockId) {
+ public TaskRunnerHistory(TajoContainerId containerId, ExecutionBlockId executionBlockId) {
init();
this.containerId = containerId;
this.executionBlockId = executionBlockId;
@@ -53,7 +53,7 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
public TaskRunnerHistory(TaskRunnerHistoryProto proto) {
this.state = Service.STATE.valueOf(proto.getState());
- this.containerId = ConverterUtils.toContainerId(proto.getContainerId());
+ this.containerId = TajoConverterUtils.toTajoContainerId(proto.getContainerId());
this.startTime = proto.getStartTime();
this.finishTime = proto.getFinishTime();
this.executionBlockId = new ExecutionBlockId(proto.getExecutionBlockId());
@@ -129,11 +129,11 @@ public class TaskRunnerHistory implements ProtoObject<TaskRunnerHistoryProto> {
this.state = state;
}
- public ContainerId getContainerId() {
+ public TajoContainerId getContainerId() {
return containerId;
}
- public void setContainerId(ContainerId containerId) {
+ public void setContainerId(TajoContainerId containerId) {
this.containerId = containerId;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/ContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ContainerProtocol.proto b/tajo-core/src/main/proto/ContainerProtocol.proto
new file mode 100644
index 0000000..df7a450
--- /dev/null
+++ b/tajo-core/src/main/proto/ContainerProtocol.proto
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are public and stable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *stable* .proto interface.
+ */
+
+option java_package = "org.apache.tajo.ipc";
+option java_outer_classname = "ContainerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+package hadoop.yarn;
+
+import "Security.proto";
+import "yarn_protos.proto";
+
+message TajoContainerIdProto {
+ optional ApplicationIdProto app_id = 1;
+ optional ApplicationAttemptIdProto app_attempt_id = 2;
+ optional int32 id = 3;
+}
+
+message TajoContainerProto {
+ optional TajoContainerIdProto id = 1;
+ optional NodeIdProto nodeId = 2;
+ optional string node_http_address = 3;
+ optional ResourceProto resource = 4;
+ optional PriorityProto priority = 5;
+ optional hadoop.common.TokenProto container_token = 6;
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/QueryMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/QueryMasterProtocol.proto b/tajo-core/src/main/proto/QueryMasterProtocol.proto
index 06d2a42..494d296 100644
--- a/tajo-core/src/main/proto/QueryMasterProtocol.proto
+++ b/tajo-core/src/main/proto/QueryMasterProtocol.proto
@@ -27,6 +27,9 @@ import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
import "TajoWorkerProtocol.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
service QueryMasterProtocolService {
//from Worker
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
index b117cac..b2db46a 100644
--- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
+++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto
@@ -23,8 +23,11 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
import "TajoMasterProtocol.proto";
+import "ContainerProtocol.proto";
import "tajo_protos.proto";
+package hadoop.yarn;
+
message NodeHeartbeat {
required WorkerConnectionInfoProto connectionInfo = 1;
optional ServerStatusProto serverStatus = 2;
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/TajoMasterProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto
index 7283543..e5eab4f 100644
--- a/tajo-core/src/main/proto/TajoMasterProtocol.proto
+++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto
@@ -28,6 +28,9 @@ import "tajo_protos.proto";
import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
message ServerStatusProto {
message System {
@@ -119,11 +122,11 @@ message WorkerResourcesRequest {
message WorkerResourceReleaseRequest {
required ExecutionBlockIdProto executionBlockId = 1;
- repeated hadoop.yarn.ContainerIdProto containerIds = 2;
+ repeated TajoContainerIdProto containerIds = 2;
}
message WorkerAllocatedResource {
- required hadoop.yarn.ContainerIdProto containerId = 1;
+ required TajoContainerIdProto containerId = 1;
required WorkerConnectionInfoProto connectionInfo = 2;
required int32 allocatedMemoryMB = 3;
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/main/proto/TajoWorkerProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
index e515438..989b0e3 100644
--- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto
+++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto
@@ -28,6 +28,9 @@ import "TajoIdProtos.proto";
import "CatalogProtos.proto";
import "PrimitiveProtos.proto";
import "Plan.proto";
+import "ContainerProtocol.proto";
+
+package hadoop.yarn;
message SessionProto {
required string session_id = 1;
@@ -170,7 +173,7 @@ message QueryExecutionRequestProto {
message GetTaskRequestProto {
required int32 workerId = 1;
- required hadoop.yarn.ContainerIdProto containerId = 2;
+ required TajoContainerIdProto containerId = 2;
required ExecutionBlockIdProto executionBlockId = 3;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
index 0423894..b8fbd67 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ContainerProtocol;
import org.apache.tajo.ipc.TajoMasterProtocol.*;
import org.apache.tajo.master.cluster.WorkerConnectionInfo;
import org.apache.tajo.rpc.NullCallback;
@@ -150,7 +151,8 @@ public class TestTajoResourceManager {
.build();
final CountDownLatch barrier = new CountDownLatch(1);
- final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+ final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
+ ArrayList<ContainerProtocol.TajoContainerIdProto>();
RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@@ -190,7 +192,7 @@ public class TestTajoResourceManager {
containerIds.add(eachResource.getContainerId());
}
- for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
@@ -318,7 +320,8 @@ public class TestTajoResourceManager {
.build();
final CountDownLatch barrier = new CountDownLatch(1);
- final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+ final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
+ ArrayList<ContainerProtocol.TajoContainerIdProto>();
RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@@ -356,7 +359,7 @@ public class TestTajoResourceManager {
assertEquals(numWorkers * 3, response.getWorkerAllocatedResourceList().size());
- for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
@@ -399,7 +402,8 @@ public class TestTajoResourceManager {
.build();
final CountDownLatch barrier = new CountDownLatch(1);
- final List<YarnProtos.ContainerIdProto> containerIds = new ArrayList<YarnProtos.ContainerIdProto>();
+ final List<ContainerProtocol.TajoContainerIdProto> containerIds = new
+ ArrayList<ContainerProtocol.TajoContainerIdProto>();
RpcCallback<WorkerResourceAllocationResponse> callBack = new RpcCallback<WorkerResourceAllocationResponse>() {
@@ -431,7 +435,7 @@ public class TestTajoResourceManager {
assertEquals(0, totalUsedDisks, 0);
- for(YarnProtos.ContainerIdProto eachContainerId: containerIds) {
+ for(ContainerProtocol.TajoContainerIdProto eachContainerId: containerIds) {
tajoWorkerResourceManager.releaseWorkerResource(eachContainerId);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 87b4197..220eb6c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -567,6 +567,7 @@ public class StorageManager {
for (Path p : inputs) {
FileSystem fs = p.getFileSystem(conf);
+
ArrayList<FileStatus> files = Lists.newArrayList();
if (fs.isFile(p)) {
files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a355a94..bec0daf 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -1,129 +1,138 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
-import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestFileSystems {
-
- protected byte[] data = null;
-
- private static String TEST_PATH = "target/test-data/TestFileSystem";
- private TajoConf conf = null;
- private StorageManager sm = null;
- private FileSystem fs = null;
- Path testDir;
-
- public TestFileSystems(FileSystem fs) throws IOException {
- conf = new TajoConf();
-
- if(fs instanceof S3FileSystem){
- conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "10");
- fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
- }
- this.fs = fs;
- sm = StorageManager.getStorageManager(conf);
- testDir = getTestDir(this.fs, TEST_PATH);
- }
-
- public Path getTestDir(FileSystem fs, String dir) throws IOException {
- Path path = new Path(dir);
- if(fs.exists(path))
- fs.delete(path, true);
-
- fs.mkdirs(path);
-
- return fs.makeQualified(path);
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> generateParameters() {
- return Arrays.asList(new Object[][] {
- {new SmallBlockS3FileSystem(new InMemoryFileSystemStore())},
- });
- }
-
- @Test
- public void testBlockSplit() throws IOException {
-
- Schema schema = new Schema();
- schema.addColumn("id", Type.INT4);
- schema.addColumn("age", Type.INT4);
- schema.addColumn("name", Type.TEXT);
-
- TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
-
- Tuple[] tuples = new Tuple[4];
- for (int i = 0; i < tuples.length; i++) {
- tuples[i] = new VTuple(3);
- tuples[i]
- .put(new Datum[] { DatumFactory.createInt4(i),
- DatumFactory.createInt4(i + 32),
- DatumFactory.createText("name" + i) });
- }
-
- Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
- "table.csv");
- fs.mkdirs(path.getParent());
-
- Appender appender = sm.getAppender(meta, schema, path);
- appender.init();
- for (Tuple t : tuples) {
- appender.addTuple(t);
- }
- appender.close();
- FileStatus fileStatus = fs.getFileStatus(path);
-
- List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
- int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
- assertEquals(splitSize, splits.size());
-
- for (FileFragment fragment : splits) {
- assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
- }
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.??See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.??The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.??You may obtain a copy of the License at
+ *
+ *?????http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class TestFileSystems {
+
+ private static String TEST_PATH = "target/test-data/TestFileSystem";
+ private Configuration conf;
+ private StorageManager sm;
+ private FileSystem fs;
+ private Path testDir;
+
+ public TestFileSystems(FileSystem fs) throws IOException {
+ this.fs = fs;
+ this.conf = fs.getConf();
+ this.testDir = getTestDir(this.fs, TEST_PATH);
+ this.sm = StorageManager.getStorageManager(new TajoConf(this.conf));
+ }
+
+ public Path getTestDir(FileSystem fs, String dir) throws IOException {
+ Path path = new Path(dir);
+ if (fs.exists(path))
+ fs.delete(path, true);
+
+ fs.mkdirs(path);
+
+ return fs.makeQualified(path);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> generateParameters() throws IOException {
+ return Arrays.asList(new Object[][]{
+ {FileSystem.getLocal(new TajoConf())},
+ });
+ }
+
+ @Before
+ public void setup() throws IOException {
+ if (!(fs instanceof LocalFileSystem)) {
+ conf.set("fs.local.block.size", "10");
+ fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
+ fs.setConf(conf);
+ }
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ if (!(fs instanceof LocalFileSystem)) {
+ fs.setConf(new TajoConf());
+ }
+ }
+
+ @Test
+ public void testBlockSplit() throws IOException {
+
+ Schema schema = new Schema();
+ schema.addColumn("id", Type.INT4);
+ schema.addColumn("age", Type.INT4);
+ schema.addColumn("name", Type.TEXT);
+
+ TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
+
+ Tuple[] tuples = new Tuple[4];
+ for (int i = 0; i < tuples.length; i++) {
+ tuples[i] = new VTuple(3);
+ tuples[i]
+ .put(new Datum[]{DatumFactory.createInt4(i),
+ DatumFactory.createInt4(i + 32),
+ DatumFactory.createText("name" + i)});
+ }
+
+ Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender",
+ "table.csv");
+ fs.mkdirs(path.getParent());
+
+ Appender appender = sm.getAppender(meta, schema, path);
+ appender.init();
+ for (Tuple t : tuples) {
+ appender.addTuple(t);
+ }
+ appender.close();
+ FileStatus fileStatus = fs.getFileStatus(path);
+
+ List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
+ int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
+ assertEquals(splitSize, splits.size());
+
+ for (FileFragment fragment : splits) {
+ assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
deleted file mode 100644
index 7b09937..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/INode.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.io.IOUtils;
-
-import java.io.*;
-
-/**
- * Holds file metadata including type (regular file, or directory),
- * and the list of blocks that are pointers to the data.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class INode {
-
- enum FileType {
- DIRECTORY, FILE
- }
-
- public static final FileType[] FILE_TYPES = {
- FileType.DIRECTORY,
- FileType.FILE
- };
-
- public static final INode DIRECTORY_INODE = new INode(FileType.DIRECTORY, null);
-
- private FileType fileType;
- private Block[] blocks;
-
- public INode(FileType fileType, Block[] blocks) {
- this.fileType = fileType;
- if (isDirectory() && blocks != null) {
- throw new IllegalArgumentException("A directory cannot contain blocks.");
- }
- this.blocks = blocks;
- }
-
- public Block[] getBlocks() {
- return blocks;
- }
-
- public FileType getFileType() {
- return fileType;
- }
-
- public boolean isDirectory() {
- return fileType == FileType.DIRECTORY;
- }
-
- public boolean isFile() {
- return fileType == FileType.FILE;
- }
-
- public long getSerializedLength() {
- return 1L + (blocks == null ? 0 : 4 + blocks.length * 16);
- }
-
-
- public InputStream serialize() throws IOException {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(bytes);
- try {
- out.writeByte(fileType.ordinal());
- if (isFile()) {
- out.writeInt(blocks.length);
- for (int i = 0; i < blocks.length; i++) {
- out.writeLong(blocks[i].getId());
- out.writeLong(blocks[i].getLength());
- }
- }
- out.close();
- out = null;
- } finally {
- IOUtils.closeStream(out);
- }
- return new ByteArrayInputStream(bytes.toByteArray());
- }
-
- public static INode deserialize(InputStream in) throws IOException {
- if (in == null) {
- return null;
- }
- DataInputStream dataIn = new DataInputStream(in);
- FileType fileType = INode.FILE_TYPES[dataIn.readByte()];
- switch (fileType) {
- case DIRECTORY:
- in.close();
- return INode.DIRECTORY_INODE;
- case FILE:
- int numBlocks = dataIn.readInt();
- Block[] blocks = new Block[numBlocks];
- for (int i = 0; i < numBlocks; i++) {
- long id = dataIn.readLong();
- long length = dataIn.readLong();
- blocks[i] = new Block(id, length);
- }
- in.close();
- return new INode(fileType, blocks);
- default:
- throw new IllegalArgumentException("Cannot deserialize inode.");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
deleted file mode 100644
index 40decc2..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/InMemoryFileSystemStore.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.tajo.common.exception.NotImplementedException;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-/**
- * A stub implementation of {@link FileSystemStore} for testing
- * {@link S3FileSystem} without actually connecting to S3.
- */
-public class InMemoryFileSystemStore implements FileSystemStore {
-
- private Configuration conf;
- private SortedMap<Path, INode> inodes = new TreeMap<Path, INode>();
- private Map<Long, byte[]> blocks = new HashMap<Long, byte[]>();
-
- @Override
- public void initialize(URI uri, Configuration conf) {
- this.conf = conf;
- }
-
- @Override
- public String getVersion() throws IOException {
- return "0";
- }
-
- @Override
- public void deleteINode(Path path) throws IOException {
- inodes.remove(normalize(path));
- }
-
- @Override
- public void deleteBlock(Block block) throws IOException {
- blocks.remove(block.getId());
- }
-
- @Override
- public boolean inodeExists(Path path) throws IOException {
- return inodes.containsKey(normalize(path));
- }
-
- @Override
- public boolean blockExists(long blockId) throws IOException {
- return blocks.containsKey(blockId);
- }
-
- @Override
- public INode retrieveINode(Path path) throws IOException {
- return inodes.get(normalize(path));
- }
-
- @Override
- public File retrieveBlock(Block block, long byteRangeStart) throws IOException {
- byte[] data = blocks.get(block.getId());
- File file = createTempFile();
- BufferedOutputStream out = null;
- try {
- out = new BufferedOutputStream(new FileOutputStream(file));
- out.write(data, (int) byteRangeStart, data.length - (int) byteRangeStart);
- } finally {
- if (out != null) {
- out.close();
- }
- }
- return file;
- }
-
- private File createTempFile() throws IOException {
- File dir = new File(conf.get("fs.s3.buffer.dir"));
- if (!dir.exists() && !dir.mkdirs()) {
- throw new IOException("Cannot create S3 buffer directory: " + dir);
- }
- File result = File.createTempFile("test-", ".tmp", dir);
- result.deleteOnExit();
- return result;
- }
-
- @Override
- public Set<Path> listSubPaths(Path path) throws IOException {
- Path normalizedPath = normalize(path);
- // This is inefficient but more than adequate for testing purposes.
- Set<Path> subPaths = new LinkedHashSet<Path>();
- for (Path p : inodes.tailMap(normalizedPath).keySet()) {
- if (normalizedPath.equals(p.getParent())) {
- subPaths.add(p);
- }
- }
- return subPaths;
- }
-
- @Override
- public Set<Path> listDeepSubPaths(Path path) throws IOException {
- Path normalizedPath = normalize(path);
- String pathString = normalizedPath.toUri().getPath();
- if (!pathString.endsWith("/")) {
- pathString += "/";
- }
- // This is inefficient but more than adequate for testing purposes.
- Set<Path> subPaths = new LinkedHashSet<Path>();
- for (Path p : inodes.tailMap(normalizedPath).keySet()) {
- if (p.toUri().getPath().startsWith(pathString)) {
- subPaths.add(p);
- }
- }
- return subPaths;
- }
-
- @Override
- public void storeINode(Path path, INode inode) throws IOException {
- inodes.put(normalize(path), inode);
- }
-
- @Override
- public void storeBlock(Block block, File file) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] buf = new byte[8192];
- int numRead;
- BufferedInputStream in = null;
- try {
- in = new BufferedInputStream(new FileInputStream(file));
- while ((numRead = in.read(buf)) >= 0) {
- out.write(buf, 0, numRead);
- }
- } finally {
- if (in != null) {
- in.close();
- }
- }
- blocks.put(block.getId(), out.toByteArray());
- }
-
- private Path normalize(Path path) {
- if (!path.isAbsolute()) {
- throw new IllegalArgumentException("Path must be absolute: " + path);
- }
- return new Path(path.toUri().getPath());
- }
-
- @Override
- public void purge() throws IOException {
- inodes.clear();
- blocks.clear();
- }
-
- @Override
- public void dump() throws IOException {
- throw new NotImplementedException();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
deleted file mode 100644
index d4034b9..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/S3OutputStream.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.hadoop.util.Progressable;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-class S3OutputStream extends OutputStream {
-
- private Configuration conf;
-
- private int bufferSize;
-
- private FileSystemStore store;
-
- private Path path;
-
- private long blockSize;
-
- private File backupFile;
-
- private OutputStream backupStream;
-
- private Random r = new Random();
-
- private boolean closed;
-
- private int pos = 0;
-
- private long filePos = 0;
-
- private int bytesWrittenToBlock = 0;
-
- private byte[] outBuf;
-
- private List<Block> blocks = new ArrayList<Block>();
-
- private Block nextBlock;
-
- private static final Log LOG =
- LogFactory.getLog(S3OutputStream.class.getName());
-
-
- public S3OutputStream(Configuration conf, FileSystemStore store,
- Path path, long blockSize, Progressable progress,
- int buffersize) throws IOException {
-
- this.conf = conf;
- this.store = store;
- this.path = path;
- this.blockSize = blockSize;
- this.backupFile = newBackupFile();
- this.backupStream = new FileOutputStream(backupFile);
- this.bufferSize = buffersize;
- this.outBuf = new byte[bufferSize];
-
- }
-
- private File newBackupFile() throws IOException {
- File dir = new File(conf.get("fs.s3.buffer.dir"));
- if (!dir.exists() && !dir.mkdirs()) {
- throw new IOException("Cannot create S3 buffer directory: " + dir);
- }
- File result = File.createTempFile("output-", ".tmp", dir);
- result.deleteOnExit();
- return result;
- }
-
- public long getPos() throws IOException {
- return filePos;
- }
-
- @Override
- public synchronized void write(int b) throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
-
- if ((bytesWrittenToBlock + pos == blockSize) || (pos >= bufferSize)) {
- flush();
- }
- outBuf[pos++] = (byte) b;
- filePos++;
- }
-
- @Override
- public synchronized void write(byte b[], int off, int len) throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
- while (len > 0) {
- int remaining = bufferSize - pos;
- int toWrite = Math.min(remaining, len);
- System.arraycopy(b, off, outBuf, pos, toWrite);
- pos += toWrite;
- off += toWrite;
- len -= toWrite;
- filePos += toWrite;
-
- if ((bytesWrittenToBlock + pos >= blockSize) || (pos == bufferSize)) {
- flush();
- }
- }
- }
-
- @Override
- public synchronized void flush() throws IOException {
- if (closed) {
- throw new IOException("Stream closed");
- }
-
- if (bytesWrittenToBlock + pos >= blockSize) {
- flushData((int) blockSize - bytesWrittenToBlock);
- }
- if (bytesWrittenToBlock == blockSize) {
- endBlock();
- }
- flushData(pos);
- }
-
- private synchronized void flushData(int maxPos) throws IOException {
- int workingPos = Math.min(pos, maxPos);
-
- if (workingPos > 0) {
- //
- // To the local block backup, write just the bytes
- //
- backupStream.write(outBuf, 0, workingPos);
-
- //
- // Track position
- //
- bytesWrittenToBlock += workingPos;
- System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);
- pos -= workingPos;
- }
- }
-
- private synchronized void endBlock() throws IOException {
- //
- // Done with local copy
- //
- backupStream.close();
-
- //
- // Send it to S3
- //
- // TODO: Use passed in Progressable to report progress.
- nextBlockOutputStream();
- store.storeBlock(nextBlock, backupFile);
- Block[] arr = new Block[blocks.size()];
- arr = blocks.toArray(arr);
- store.storeINode(path, new INode(INode.FILE_TYPES[1], arr));
-
- //
- // Delete local backup, start new one
- //
- boolean b = backupFile.delete();
- if (!b) {
- LOG.warn("Ignoring failed delete");
- }
- backupFile = newBackupFile();
- backupStream = new FileOutputStream(backupFile);
- bytesWrittenToBlock = 0;
- }
-
- private synchronized void nextBlockOutputStream() throws IOException {
- long blockId = r.nextLong();
- while (store.blockExists(blockId)) {
- blockId = r.nextLong();
- }
- nextBlock = new Block(blockId, bytesWrittenToBlock);
- blocks.add(nextBlock);
- bytesWrittenToBlock = 0;
- }
-
-
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
-
- flush();
- if (filePos == 0 || bytesWrittenToBlock != 0) {
- endBlock();
- }
-
- backupStream.close();
- boolean b = backupFile.delete();
- if (!b) {
- LOG.warn("Ignoring failed delete");
- }
-
- super.close();
-
- closed = true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/3d485ecb/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java b/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
deleted file mode 100644
index fc1c908..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/s3/SmallBlockS3FileSystem.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.s3;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.s3.Block;
-import org.apache.hadoop.fs.s3.FileSystemStore;
-import org.apache.hadoop.fs.s3.INode;
-import org.apache.hadoop.fs.s3.S3FileSystem;
-import org.apache.hadoop.util.Progressable;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-public class SmallBlockS3FileSystem extends S3FileSystem {
-
- private URI uri;
-
- private FileSystemStore store;
-
- private Path workingDir;
-
- static class Holder {
- private static InMemoryFileSystemStore s;
-
- public synchronized static FileSystemStore get() {
- if(s != null) {
- return s;
- }
- s = new InMemoryFileSystemStore();
- return s;
- }
-
- public synchronized static void set(InMemoryFileSystemStore inMemoryFileSystemStore) {
- s = inMemoryFileSystemStore;
- }
- }
-
- public SmallBlockS3FileSystem() {
- }
-
-
- public SmallBlockS3FileSystem(
- InMemoryFileSystemStore inMemoryFileSystemStore) {
- Holder.set(inMemoryFileSystemStore);
- this.store = inMemoryFileSystemStore;
- }
-
- @Override
- public URI getUri() {
- return uri;
- }
- @Override
- public long getDefaultBlockSize() {
- return 10;
- }
-
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- if (store == null) {
- store = Holder.get();
- }
- store.initialize(uri, conf);
- setConf(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- this.workingDir =
- new Path("/user", System.getProperty("user.name")).makeQualified(this);
- }
- @Override
- public boolean isFile(Path path) throws IOException {
- INode inode = store.retrieveINode(makeAbsolute(path));
- if (inode == null) {
- return false;
- }
- return inode.isFile();
- }
-
- private INode checkFile(Path path) throws IOException {
- INode inode = store.retrieveINode(makeAbsolute(path));
- if (inode == null) {
- throw new IOException("No such file.");
- }
- if (inode.isDirectory()) {
- throw new IOException("Path " + path + " is a directory.");
- }
- return inode;
- }
-
- @Override
- public FileStatus[] listStatus(Path f) throws IOException {
- Path absolutePath = makeAbsolute(f);
- INode inode = store.retrieveINode(absolutePath);
- if (inode == null) {
- throw new FileNotFoundException("File " + f + " does not exist.");
- }
- if (inode.isFile()) {
- return new FileStatus[] {
- new S3FileStatus(f.makeQualified(this), inode)
- };
- }
- ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
- for (Path p : store.listSubPaths(absolutePath)) {
- ret.add(getFileStatus(p.makeQualified(this)));
- }
- return ret.toArray(new FileStatus[0]);
- }
- @Override
- public FSDataOutputStream create(Path file, FsPermission permission,
- boolean overwrite, int bufferSize,
- short replication, long blockSize, Progressable progress)
- throws IOException {
-
- INode inode = store.retrieveINode(makeAbsolute(file));
- if (inode != null) {
- if (overwrite) {
- delete(file, true);
- } else {
- throw new IOException("File already exists: " + file);
- }
- } else {
- Path parent = file.getParent();
- if (parent != null) {
- if (!mkdirs(parent)) {
- throw new IOException("Mkdirs failed to create " + parent.toString());
- }
- }
- }
- return new FSDataOutputStream
- (new S3OutputStream(getConf(), store, makeAbsolute(file),
- blockSize, progress, bufferSize),
- statistics);
- }
- @Override
- public boolean mkdirs(Path path, FsPermission permission) throws IOException {
- Path absolutePath = makeAbsolute(path);
- List<Path> paths = new ArrayList<Path>();
- do {
- paths.add(0, absolutePath);
- absolutePath = absolutePath.getParent();
- } while (absolutePath != null);
-
- boolean result = true;
- for (Path p : paths) {
- result &= mkdir(p);
- }
- return result;
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDir;
- }
-
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- Path absoluteSrc = makeAbsolute(src);
- INode srcINode = store.retrieveINode(absoluteSrc);
- if (srcINode == null) {
- // src path doesn't exist
- return false;
- }
- Path absoluteDst = makeAbsolute(dst);
- INode dstINode = store.retrieveINode(absoluteDst);
- if (dstINode != null && dstINode.isDirectory()) {
- absoluteDst = new Path(absoluteDst, absoluteSrc.getName());
- dstINode = store.retrieveINode(absoluteDst);
- }
- if (dstINode != null) {
- // dst path already exists - can't overwrite
- return false;
- }
- Path dstParent = absoluteDst.getParent();
- if (dstParent != null) {
- INode dstParentINode = store.retrieveINode(dstParent);
- if (dstParentINode == null || dstParentINode.isFile()) {
- // dst parent doesn't exist or is a file
- return false;
- }
- }
- return renameRecursive(absoluteSrc, absoluteDst);
- }
-
- private boolean renameRecursive(Path src, Path dst) throws IOException {
- INode srcINode = store.retrieveINode(src);
- store.storeINode(dst, srcINode);
- store.deleteINode(src);
- if (srcINode.isDirectory()) {
- for (Path oldSrc : store.listDeepSubPaths(src)) {
- INode inode = store.retrieveINode(oldSrc);
- if (inode == null) {
- return false;
- }
- String oldSrcPath = oldSrc.toUri().getPath();
- String srcPath = src.toUri().getPath();
- String dstPath = dst.toUri().getPath();
- Path newDst = new Path(oldSrcPath.replaceFirst(srcPath, dstPath));
- store.storeINode(newDst, inode);
- store.deleteINode(oldSrc);
- }
- }
- return true;
- }
-
- @Override
- public boolean delete(Path path, boolean recursive) throws IOException {
- Path absolutePath = makeAbsolute(path);
- INode inode = store.retrieveINode(absolutePath);
- if (inode == null) {
- return false;
- }
- if (inode.isFile()) {
- store.deleteINode(absolutePath);
- for (Block block: inode.getBlocks()) {
- store.deleteBlock(block);
- }
- } else {
- FileStatus[] contents = null;
- try {
- contents = listStatus(absolutePath);
- } catch(FileNotFoundException fnfe) {
- return false;
- }
-
- if ((contents.length !=0) && (!recursive)) {
- throw new IOException("Directory " + path.toString()
- + " is not empty.");
- }
- for (FileStatus p:contents) {
- if (!delete(p.getPath(), recursive)) {
- return false;
- }
- }
- store.deleteINode(absolutePath);
- }
- return true;
- }
-
- /**
- * FileStatus for S3 file systems.
- */
- @Override
- public FileStatus getFileStatus(Path f) throws IOException {
- INode inode = store.retrieveINode(makeAbsolute(f));
- if (inode == null) {
- throw new FileNotFoundException(f + ": No such file or directory.");
- }
- return new S3FileStatus(f.makeQualified(this), inode);
- }
- private boolean mkdir(Path path) throws IOException {
- Path absolutePath = makeAbsolute(path);
- INode inode = store.retrieveINode(absolutePath);
- if (inode == null) {
- store.storeINode(absolutePath, INode.DIRECTORY_INODE);
- } else if (inode.isFile()) {
- throw new IOException(String.format(
- "Can't make directory for path %s since it is a file.",
- absolutePath));
- }
- return true;
- }
- private Path makeAbsolute(Path path) {
- if (path.isAbsolute()) {
- return path;
- }
- return new Path(workingDir, path);
- }
-
- private static class S3FileStatus extends FileStatus {
-
- S3FileStatus(Path f, INode inode) throws IOException {
- super(findLength(inode), inode.isDirectory(), 1,
- findBlocksize(inode), 0, f);
- }
-
- private static long findLength(INode inode) {
- if (!inode.isDirectory()) {
- long length = 0L;
- for (Block block : inode.getBlocks()) {
- length += block.getLength();
- }
- return length;
- }
- return 0;
- }
-
- private static long findBlocksize(INode inode) {
- final Block[] ret = inode.getBlocks();
- return ret == null ? 0L : ret[0].getLength();
- }
- }
-}
\ No newline at end of file