You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/23 13:37:33 UTC

[5/8] TAJO-1121: Remove the 'v2' storage package.

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
deleted file mode 100644
index e15ca6e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.LazyTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class CSVFileScanner extends FileScannerV2 {
-  public static final String DELIMITER = "csvfile.delimiter";
-  public static final String DELIMITER_DEFAULT = "|";
-  public static final byte LF = '\n';
-  private static final Log LOG = LogFactory.getLog(CSVFileScanner.class);
-
-  private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
-  private int bufSize;
-  private char delimiter;
-  private ScheduledInputStream sin;
-  private InputStream is; // decompressd stream
-  private CompressionCodecFactory factory;
-  private CompressionCodec codec;
-  private Decompressor decompressor;
-  private Seekable filePosition;
-  private boolean splittable = true;
-  private long startOffset, length;
-  private byte[] buf = null;
-  private byte[][] tuples = null;
-  private long[] tupleOffsets = null;
-  private int currentIdx = 0, validIdx = 0;
-  private byte[] tail = null;
-  private long pageStart = -1;
-  private long prevTailLen = -1;
-  private int[] targetColumnIndexes;
-  private boolean eof = false;
-  private boolean first = true;
-
-  private long totalReadBytesForFetch;
-  private long totalReadBytesFromDisk;
-
-  public CSVFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
-      throws IOException {
-    super(conf, meta, schema, fragment);
-    factory = new CompressionCodecFactory(conf);
-    codec = factory.getCodec(this.fragment.getPath());
-    if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
-      splittable = false;
-    }
-  }
-
-  @Override
-  public void init() throws IOException {
-    // Buffer size, Delimiter
-    this.bufSize = DEFAULT_BUFFER_SIZE;
-    String delim  = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
-    this.delimiter = delim.charAt(0);
-
-    super.init();
-  }
-
-  @Override
-  protected boolean initFirstScan(int maxBytesPerSchedule) throws IOException {
-    synchronized(this) {
-      eof = false;
-      first = true;
-      if(sin == null) {
-        FSDataInputStream fin = fs.open(fragment.getPath(), 128 * 1024);
-        sin = new ScheduledInputStream(fragment.getPath(), fin,
-            fragment.getStartKey(), fragment.getEndKey(), fs.getLength(fragment.getPath()));
-        startOffset = fragment.getStartKey();
-        length = fragment.getEndKey();
-
-        if (startOffset > 0) {
-          startOffset--; // prev line feed
-        }
-      }
-    }
-    return true;
-  }
-
-  private boolean scanFirst() throws IOException {
-    if (codec != null) {
-      decompressor = CodecPool.getDecompressor(codec);
-      if (codec instanceof SplittableCompressionCodec) {
-        SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
-            sin, decompressor, startOffset, startOffset + length,
-            SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
-        startOffset = cIn.getAdjustedStart();
-        length = cIn.getAdjustedEnd() - startOffset;
-        filePosition = cIn;
-        is = cIn;
-      } else {
-        is = new DataInputStream(codec.createInputStream(sin, decompressor));
-      }
-    } else {
-      sin.seek(startOffset);
-      filePosition = sin;
-      is = sin;
-    }
-
-    tuples = new byte[0][];
-    if (targets == null) {
-      targets = schema.toArray();
-    }
-
-    targetColumnIndexes = new int[targets.length];
-    for (int i = 0; i < targets.length; i++) {
-      targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
-          "," + fs.getFileStatus(fragment.getPath()).getLen());
-    }
-
-    if (startOffset != 0) {
-      int rbyte;
-      while ((rbyte = is.read()) != LF) {
-        if(rbyte == -1) break;
-      }
-    }
-
-    if (fragmentable() < 1) {
-      close();
-      return false;
-    }
-    return true;
-  }
-
-  @Override
-  public boolean isStopScanScheduling() {
-    if(sin != null && sin.isEndOfStream()) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private long fragmentable() throws IOException {
-    return startOffset + length - getFilePosition();
-  }
-
-  @Override
-  protected long getFilePosition() throws IOException {
-    long retVal;
-    if (filePosition != null) {
-      retVal = filePosition.getPos();
-    } else {
-      retVal = sin.getPos();
-    }
-    return retVal;
-  }
-
-  @Override
-  public boolean isFetchProcessing() {
-    if(sin != null &&
-        (sin.getAvaliableSize() >= 64 * 1024 * 1024)) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  private void page() throws IOException {
-    // Index initialization
-    currentIdx = 0;
-
-    // Buffer size set
-    if (isSplittable() && fragmentable() < DEFAULT_BUFFER_SIZE) {
-      bufSize = (int) fragmentable();
-    }
-
-    if (this.tail == null || this.tail.length == 0) {
-      this.pageStart = getFilePosition();
-      this.prevTailLen = 0;
-    } else {
-      this.pageStart = getFilePosition() - this.tail.length;
-      this.prevTailLen = this.tail.length;
-    }
-
-    // Read
-    int rbyte;
-    buf = new byte[bufSize];
-    rbyte = is.read(buf);
-
-    if (rbyte < 0) {
-      eof = true; // EOF
-      return;
-    }
-
-    if (prevTailLen == 0) {
-      tail = new byte[0];
-      tuples = BytesUtils.splitPreserveAllTokens(buf, rbyte, (char) LF);
-    } else {
-      byte[] lastRow = ArrayUtils.addAll(tail, buf);
-      tuples = BytesUtils.splitPreserveAllTokens(lastRow, rbyte + tail.length, (char) LF);
-      tail = null;
-    }
-
-    // Check tail
-    if ((char) buf[rbyte - 1] != LF) {
-      if ((fragmentable() < 1 || rbyte != bufSize)) {
-        int lineFeedPos = 0;
-        byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
-
-        // find line feed
-        while ((temp[lineFeedPos] = (byte)is.read()) != (byte)LF) {
-          if(temp[lineFeedPos] < 0) {
-            break;
-          }
-          lineFeedPos++;
-        }
-
-        tuples[tuples.length - 1] = ArrayUtils.addAll(tuples[tuples.length - 1],
-            ArrayUtils.subarray(temp, 0, lineFeedPos));
-        validIdx = tuples.length;
-      } else {
-        tail = tuples[tuples.length - 1];
-        validIdx = tuples.length - 1;
-      }
-    } else {
-      tail = new byte[0];
-      validIdx = tuples.length - 1;
-    }
-
-    if(!isCompress()) makeTupleOffset();
-  }
-
-  private void makeTupleOffset() {
-    long curTupleOffset = 0;
-    this.tupleOffsets = new long[this.validIdx];
-    for (int i = 0; i < this.validIdx; i++) {
-      this.tupleOffsets[i] = curTupleOffset + this.pageStart;
-      curTupleOffset += this.tuples[i].length + 1;//tuple byte +  1byte line feed
-    }
-  }
-
-  protected Tuple nextTuple() throws IOException {
-    if(first) {
-      boolean more = scanFirst();
-      first = false;
-      if(!more) {
-        return null;
-      }
-    }
-    try {
-      if (currentIdx == validIdx) {
-        if (isSplittable() && fragmentable() < 1) {
-          close();
-          return null;
-        } else {
-          page();
-        }
-
-        if(eof){
-          close();
-          return null;
-        }
-      }
-
-      long offset = -1;
-      if(!isCompress()){
-        offset = this.tupleOffsets[currentIdx];
-      }
-
-      byte[][] cells = BytesUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter, targetColumnIndexes);
-      return new LazyTuple(schema, cells, offset);
-    } catch (Throwable t) {
-      LOG.error(t.getMessage(), t);
-    }
-    return null;
-  }
-
-  private boolean isCompress() {
-    return codec != null;
-  }
-
-  @Override
-  public void scannerReset() {
-    if(sin != null) {
-      try {
-        filePosition.seek(0);
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
-    if(sin != null) {
-      try {
-        sin.seek(0);
-        sin.reset();
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-      }
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if(closed.get()) {
-      return;
-    }
-    if(sin != null) {
-      totalReadBytesForFetch = sin.getTotalReadBytesForFetch();
-      totalReadBytesFromDisk = sin.getTotalReadBytesFromDisk();
-    }
-    try {
-      if(is != null) {
-        is.close();
-      }
-      is = null;
-      sin = null;
-    } finally {
-      if (decompressor != null) {
-        CodecPool.returnDecompressor(decompressor);
-        decompressor = null;
-      }
-      tuples = null;
-      super.close();
-    }
-  }
-
-  @Override
-  protected boolean scanNext(int length) throws IOException {
-    synchronized(this) {
-      if(isClosed()) {
-        return false;
-      }
-      return sin.readNext(length);
-    }
-  }
-
-  @Override
-  public boolean isProjectable() {
-    return true;
-  }
-
-  @Override
-  public boolean isSelectable() {
-    return false;
-  }
-
-  @Override
-  public void setSearchCondition(Object expr) {
-  }
-
-  @Override
-  public boolean isSplittable(){
-    return splittable;
-  }
-
-  @Override
-  protected long[] reportReadBytes() {
-    return new long[]{totalReadBytesForFetch, totalReadBytesFromDisk};
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
deleted file mode 100644
index 7802c91..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
-	private int id;
-	private String name;
-	
-	private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
-	public DiskDeviceInfo(int id) {
-		this.id = id;
-	}
-	
-	public int getId() {
-		return id;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	@Override
-	public String toString() {
-		return id + "," + name;
-	}
-
-	public void addMountPath(DiskMountInfo diskMountInfo) {
-		mountInfos.add(diskMountInfo);
-	}
-
-	public List<DiskMountInfo> getMountInfos() {
-		return mountInfos;
-	}
-
-	public void setMountInfos(List<DiskMountInfo> mountInfos) {
-		this.mountInfos = mountInfos;
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
deleted file mode 100644
index 1babf99..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-
-public final class DiskFileScanScheduler extends Thread {
-  private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
-
-	private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
-
-  List<FileScannerV2> fetchingScanners = new ArrayList<FileScannerV2>();
-
-  private int scanConcurrency;
-
-	private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
-
-	private Object requestQueueMonitor = new Object(); // c++ code style
-
-	private StorageManagerV2.StorgaeManagerContext smContext;
-
-	private DiskDeviceInfo diskDeviceInfo;
-
-	private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private long totalScanCount = 0;
-
-  private FetchWaitingThread fetchWaitingThread;
-
-  private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
-
-  private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
-
-  private long[] lastReportReadBytes;
-
-  private long lastReportTime = 0;
-
-	public DiskFileScanScheduler(
-			StorageManagerV2.StorgaeManagerContext smContext,
-			DiskDeviceInfo diskDeviceInfo) {
-		super("DiskFileScanner:" + diskDeviceInfo);
-		this.smContext = smContext;
-		this.diskDeviceInfo = diskDeviceInfo;
-		initScannerPool();
-		this.fetchWaitingThread = new FetchWaitingThread();
-		this.fetchWaitingThread.start();
-	}
-
-  public void incrementReadBytes(long[] readBytes) {
-    totalReadBytesForFetch.addAndGet(readBytes[0]);
-    totalReadBytesFromDisk.addAndGet(readBytes[1]);
-  }
-
-  public int getDiskId() {
-    return diskDeviceInfo.getId();
-  }
-
-  public void run() {
-    synchronized (requestQueueMonitor) {
-      while(!stopped.get()) {
-        if(isAllScannerRunning()) {
-          try {
-            requestQueueMonitor.wait(2000);
-            continue;
-          } catch (InterruptedException e) {
-            break;
-          }
-        } else {
-          FileScannerV2 fileScanner = requestQueue.poll();
-          if(fileScanner == null) {
-            try {
-              requestQueueMonitor.wait(2000);
-              continue;
-            } catch (InterruptedException e) {
-              break;
-            }
-          }
-          if(fileScanner.isStopScanScheduling()) {
-            LOG.info("Exit from Disk Queue:" + fileScanner.getId());
-            continue;
-          }
-          if(fileScanner.isFetchProcessing()) {
-            synchronized(fetchingScanners) {
-              fetchingScanners.add(fileScanner);
-              //fetchingScanners.notifyAll();
-            }
-          } else {
-            numOfRunningScanners.incrementAndGet();
-            FileScanRunner fileScanRunner = new FileScanRunner(
-                DiskFileScanScheduler.this, smContext,
-                fileScanner, requestQueueMonitor,
-                numOfRunningScanners);
-            totalScanCount++;
-            fileScanRunner.start();
-          }
-        }
-      }
-    }
-  }
-
-	protected void requestScanFile(FileScannerV2 fileScannerV2) {
-		synchronized (requestQueueMonitor) {
-			requestQueue.offer(fileScannerV2);
-			requestQueueMonitor.notifyAll();
-		}
-	}
-
-  public class FetchWaitingThread extends Thread {
-    List<FileScannerV2> workList = new ArrayList<FileScannerV2>(20);
-    public void run() {
-      while(!stopped.get()) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          break;
-        }
-        workList.clear();
-        synchronized(fetchingScanners) {
-          workList.addAll(fetchingScanners);
-          fetchingScanners.clear();
-        }
-        synchronized(requestQueueMonitor) {
-          for(FileScannerV2 eachScanner: workList) {
-            requestQueue.offer(eachScanner);
-          }
-          requestQueueMonitor.notifyAll();
-        }
-      }
-    }
-  }
-
-	private void initScannerPool() {
-		// TODO finally implements heuristic, currently set with property
-		scanConcurrency = smContext.getConf().getIntVar(ConfVars.STORAGE_MANAGER_CONCURRENCY_PER_DISK);
-	}
-
-  public int getTotalQueueSize() {
-      return requestQueue.size();
-  }
-
-  boolean isAllScannerRunning() {
-    return numOfRunningScanners.get() >= scanConcurrency;
-  }
-
-  public long getTotalScanCount() {
-    return totalScanCount;
-  }
-
-	public void stopScan() {
-		stopped.set(true);
-		if (fetchWaitingThread != null) {
-      fetchWaitingThread.interrupt();
-		}
-
-		this.interrupt();
-	}
-
-  public void printDiskSchedulerInfo() {
-    long currentReadBytes[] = new long[]{totalReadBytesForFetch.get(), totalReadBytesFromDisk.get()};
-    int[] throughput = new int[2];
-    if(lastReportTime != 0 && lastReportReadBytes != null) {
-      int sec = (int)((System.currentTimeMillis() - lastReportTime)/1000);
-      throughput[0] = (int)((currentReadBytes[0] - lastReportReadBytes[0])/sec);
-      throughput[1] = (int)((currentReadBytes[1] - lastReportReadBytes[1])/sec);
-    }
-    lastReportTime = System.currentTimeMillis();
-
-    LOG.info("===>" + DiskFileScanScheduler.this.diskDeviceInfo
-        + ", request=" + requestQueue.size()
-        + ", fetching=" + fetchingScanners.size()
-        + ", running=" + numOfRunningScanners.get()
-        + ", totalScan=" + totalScanCount
-        + ", FetchThroughput=" + throughput[0]/1024 + "KB"
-        + ", DiskScanThroughput=" + throughput[1]/1024 + "KB");
-
-    lastReportReadBytes = currentReadBytes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
deleted file mode 100644
index d71154c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-public class DiskInfo {
-	private int id;
-	private String partitionName;
-	private String mountPath;
-	
-	private long capacity;
-	private long used;
-	
-	public DiskInfo(int id, String partitionName) {
-		this.id = id;
-		this.partitionName = partitionName;
-	}
-
-	public int getId() {
-		return id;
-	}
-
-	public void setId(int id) {
-		this.id = id;
-	}
-
-	public String getPartitionName() {
-		return partitionName;
-	}
-
-	public void setPartitionName(String partitionName) {
-		this.partitionName = partitionName;
-	}
-
-	public String getMountPath() {
-		return mountPath;
-	}
-
-	public void setMountPath(String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public long getCapacity() {
-		return capacity;
-	}
-
-	public void setCapacity(long capacity) {
-		this.capacity = capacity;
-	}
-
-	public long getUsed() {
-		return used;
-	}
-
-	public void setUsed(long used) {
-		this.used = used;
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
deleted file mode 100644
index 56100f2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
-	private String mountPath;
-	
-	private long capacity;
-	private long used;
-	
-	private int deviceId;
-	
-	public DiskMountInfo(int deviceId, String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public String getMountPath() {
-		return mountPath;
-	}
-
-	public void setMountPath(String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public long getCapacity() {
-		return capacity;
-	}
-
-	public void setCapacity(long capacity) {
-		this.capacity = capacity;
-	}
-
-	public long getUsed() {
-		return used;
-	}
-
-	public void setUsed(long used) {
-		this.used = used;
-	}
-
-	public int getDeviceId() {
-		return deviceId;
-	}
-
-  @Override
-  public boolean equals(Object obj){
-    if (!(obj instanceof DiskMountInfo)) return false;
-
-    if (compareTo((DiskMountInfo) obj) == 0) return true;
-    else return false;
-  }
-
-  @Override
-  public int hashCode(){
-    return Objects.hashCode(mountPath);
-  }
-
-	@Override
-	public int compareTo(DiskMountInfo other) {
-		String path1 = mountPath;
-		String path2 = other.mountPath;
-		
-		int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
-		int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-		
-		if(path1Depth > path2Depth) {
-			return -1;
-		} else if(path1Depth < path2Depth) {
-			return 1;
-		} else {
-			int path1Length = path1.length();
-			int path2Length = path2.length();
-			
-			if(path1Length < path2Length) {
-				return 1;
-			} else if(path1Length > path2Length) {
-				return -1;
-			} else {
-				return path1.compareTo(path2);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
deleted file mode 100644
index 66827c2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Util;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-
-public class DiskUtil {
-
-  static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
-  public enum OSType {
-		OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
-	}
-
-	static private OSType getOSType() {
-		String osName = System.getProperty("os.name");
-		if (osName.contains("Windows")
-				&& (osName.contains("XP") || osName.contains("2003")
-						|| osName.contains("Vista")
-						|| osName.contains("Windows_7")
-						|| osName.contains("Windows 7") || osName
-							.contains("Windows7"))) {
-			return OSType.OS_TYPE_WINXP;
-		} else if (osName.contains("SunOS") || osName.contains("Solaris")) {
-			return OSType.OS_TYPE_SOLARIS;
-		} else if (osName.contains("Mac")) {
-			return OSType.OS_TYPE_MAC;
-		} else {
-			return OSType.OS_TYPE_UNIX;
-		}
-	}
-	
-	public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
-		List<DiskDeviceInfo> deviceInfos;
-		
-		if(getOSType() == OSType.OS_TYPE_UNIX) {
-			deviceInfos = getUnixDiskDeviceInfos();
-			setDeviceMountInfo(deviceInfos);
-		} else {
-			deviceInfos = getDefaultDiskDeviceInfos();
-		}
-		
-		return deviceInfos;
-	}
-
-	private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
-		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-		
-		File file = new File(UNIX_DISK_DEVICE_PATH);
-		if(!file.exists()) {
-			System.out.println("No partition file:" + file.getAbsolutePath());
-			return getDefaultDiskDeviceInfos();
-		}
-		
-		BufferedReader reader = null;
-		try {
-			reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
-			String line = null;
-			
-			int count = 0;
-			Set<String> deviceNames = new TreeSet<String>();
-			while((line = reader.readLine()) != null) {
-				if(count > 0 && !line.trim().isEmpty()) {
-					String[] tokens = line.trim().split(" +");
-					if(tokens.length == 4) {
-						String deviceName = getDiskDeviceName(tokens[3]);
-						deviceNames.add(deviceName);
-					}
-				}
-				count++;
-			}
-			
-			int id = 0;
-			for(String eachDeviceName: deviceNames) {
-				DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
-				diskDeviceInfo.setName(eachDeviceName);
-				
-				//TODO set addtional info
-				// /sys/block/sda/queue
-				infos.add(diskDeviceInfo);
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if(reader != null) {
-				try {
-					reader.close();
-				} catch (IOException e) {
-				}
-			}
-		}
-		
-		return infos;
-	}
-	
-	private static String getDiskDeviceName(String partitionName) {
-		byte[] bytes = partitionName.getBytes();
-		
-		byte[] result = new byte[bytes.length];
-		int length = 0;
-		for(int i = 0; i < bytes.length; i++, length++) {
-			if(bytes[i] >= '0' && bytes[i] <= '9') {
-				break;
-			} else {
-				result[i] = bytes[i];
-			}
-		}
-		
-		return new String(result, 0, length);
-	}
-	
-	public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
-		DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
-		diskDeviceInfo.setName("default");
-		
-		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-		
-		infos.add(diskDeviceInfo);
-		
-		return infos;
-	}
-	
-	
-	private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
-		Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
-		for(DiskDeviceInfo eachDevice: deviceInfos) {
-			deviceMap.put(eachDevice.getName(), eachDevice);
-		}
-		
-		BufferedReader mountOutput = null;
-		try {
-			Process mountProcess = Runtime.getRuntime().exec("mount");
-			mountOutput = new BufferedReader(new InputStreamReader(
-					mountProcess.getInputStream()));
-			while (true) {
-				String line = mountOutput.readLine();
-				if (line == null) {
-					break;
-				}
-
-				int indexStart = line.indexOf(" on /");
-				int indexEnd = line.indexOf(" ", indexStart + 4);
-
-				String deviceName = line.substring(0, indexStart).trim();
-				String[] deviceNameTokens = deviceName.split("/");
-				if(deviceNameTokens.length == 3) {
-					if("dev".equals(deviceNameTokens[1])) {
-						String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
-						String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-						
-						DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
-						if(diskDeviceInfo != null) {
-							diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
-						}
-					}
-				}
-			}
-		} catch (IOException e) {
-			throw e;
-		} finally {
-			if (mountOutput != null) {
-				mountOutput.close();
-			}
-		}
-	}
-
-  public static int getDataNodeStorageSize(){
-    return getStorageDirs().size();
-  }
-
-  public static List<URI> getStorageDirs(){
-    Configuration conf = new HdfsConfiguration();
-    Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
-    return Util.stringCollectionAsURIs(dirNames);
-  }
-
-	public static void main(String[] args) throws Exception {
-		System.out.println("/dev/sde1".split("/").length);
-		for(String eachToken: "/dev/sde1".split("/")) {
-			System.out.println(eachToken);
-		}
- 	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
deleted file mode 100644
index 07fbe6c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileScanRunner extends Thread {
-  private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
-
-  StorageManagerV2.StorgaeManagerContext smContext;
-	FileScannerV2 fileScanner;
-	Object requestQueueMonitor;
-	AtomicInteger numOfRunningScanners;
-	DiskFileScanScheduler diskFileScanScheduler;
-	
-	int maxReadBytes;
-	
-	public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler, 
-			StorageManagerV2.StorgaeManagerContext smContext,
-      FileScannerV2 fileScanner, Object requestQueueMonitor,
-			AtomicInteger numOfRunningScanners) {
-		super("FileScanRunner:" + fileScanner.getId());
-		this.diskFileScanScheduler = diskFileScanScheduler;
-		this.fileScanner = fileScanner;
-		this.smContext = smContext;
-		this.requestQueueMonitor = requestQueueMonitor;
-		this.numOfRunningScanners = numOfRunningScanners;
-		
-		this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
-	}
-
-	public void run() {
-    try {
-//      long startTime = System.currentTimeMillis();
-//      boolean fetching = fileScanner.isFetchProcessing();
-      fileScanner.scan(maxReadBytes);
-//      if(diskFileScanScheduler.getDiskId() == 1) {
-//        LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
-//            ",fetching=" + fetching +
-//            ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
-//      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      synchronized(requestQueueMonitor) {
-        numOfRunningScanners.decrementAndGet();
-        requestQueueMonitor.notifyAll();
-      }
-    }
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
deleted file mode 100644
index da7084c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class FileScannerV2 implements Scanner {
-  private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
-
-	protected AtomicBoolean closed = new AtomicBoolean(false);
-
-	protected FileSystem fs;
-
-  protected boolean inited = false;
-  protected final Configuration conf;
-  protected final TableMeta meta;
-  protected final Schema schema;
-  protected final FileFragment fragment;
-  protected final int columnNum;
-  protected Column[] targets;
-  protected long totalScanTime = 0;
-  protected int allocatedDiskId;
-
-  protected StorageManagerV2.StorgaeManagerContext smContext;
-
-  protected AtomicBoolean firstSchdeuled = new AtomicBoolean(true);
-  
-  protected float progress;
-  
-  protected TableStats tableStats;
-
-  protected abstract boolean scanNext(int length) throws IOException;
-
-  protected abstract boolean initFirstScan(int maxBytesPerSchedule) throws IOException;
-
-  protected abstract long getFilePosition() throws IOException;
-
-  protected abstract Tuple nextTuple() throws IOException;
-
-  public abstract boolean isFetchProcessing();
-
-  public abstract boolean isStopScanScheduling();
-
-  public abstract void scannerReset();
-
-  protected abstract long[] reportReadBytes();
-
-	public FileScannerV2(final Configuration conf,
-                       final TableMeta meta,
-                       final Schema schema,
-                       final FileFragment fragment) throws IOException {
-    this.conf = conf;
-    this.meta = meta;
-    this.schema = schema;
-    this.fragment = fragment;
-    this.columnNum = this.schema.size();
-
-    this.fs = fragment.getPath().getFileSystem(conf);
-	}
-
-  public void init() throws IOException {
-    closed.set(false);
-    firstSchdeuled.set(true);
-
-    if(!inited) {
-      smContext.requestFileScan(this);
-    }
-    inited = true;
-    progress = 0.0f;
-
-    tableStats = new TableStats();
-    if (fragment != null) {
-      tableStats.setNumBytes(fragment.getEndKey());
-      tableStats.setNumBlocks(1);
-    }
-
-    if (schema != null) {
-      for(Column eachColumn: schema.getColumns()) {
-        ColumnStats columnStats = new ColumnStats(eachColumn);
-        tableStats.addColumnStat(columnStats);
-      }
-    }
-  }
-
-  @Override
-  public void reset() throws IOException {
-    scannerReset();
-    close();
-    inited = false;
-    init();
-  }
-
-  public void setAllocatedDiskId(int allocatedDiskId) {
-    this.allocatedDiskId = allocatedDiskId;
-  }
-
-  public String getId() {
-    return fragment.getPath().getName() + ":" + fragment.getStartKey() + ":" +
-        fragment.getEndKey() + "_" + System.currentTimeMillis();
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-    this.targets = targets;
-  }
-
-  public Path getPath() {
-    return fragment.getPath();
-  }
-
-  public int getDiskId() {
-    if(fragment.getDiskIds().length <= 0) {
-      //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartKey());
-      return -1;
-    } else {
-      return fragment.getDiskIds()[0];
-    }
-  }
-
-  public void setSearchCondition(Object expr) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-  }
-
-  public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
-    this.smContext = context;
-  }
-
-  public String toString() {
-    return fragment.getPath() + ":" + fragment.getStartKey();
-  }
-
-  public void scan(int maxBytesPerSchedule) throws IOException {
-    long startTime = System.currentTimeMillis();
-    try {
-    synchronized(firstSchdeuled) {
-      if(firstSchdeuled.get()) {
-        boolean moreData = initFirstScan(maxBytesPerSchedule);
-        firstSchdeuled.set(false);
-        firstSchdeuled.notifyAll();
-        if(moreData) {
-          smContext.requestFileScan(this);
-        }
-        return;
-      }
-    }
-    boolean moreData = scanNext(maxBytesPerSchedule);
-
-    if(moreData) {
-      smContext.requestFileScan(this);
-    }
-    } finally {
-      totalScanTime += System.currentTimeMillis() - startTime;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if(closed.get()) {
-      return;
-    }
-    long[] readBytes = reportReadBytes();
-    smContext.incrementReadBytes(allocatedDiskId, readBytes);
-    closed.set(true);
-    progress = 1.0f;
-    LOG.info(toString() + " closed, totalScanTime=" + totalScanTime);
-  }
-
-  public boolean isClosed() {
-    return closed.get();
-  }
-
-  public Tuple next() throws IOException {
-    synchronized(firstSchdeuled) {
-      if(firstSchdeuled.get()) {
-        try {
-          firstSchdeuled.wait();
-        } catch (InterruptedException e) {
-        }
-      }
-    }
-    return nextTuple();
-  }
-
-  @Override
-  public float getProgress() {
-    return progress;
-  }
-
-  @Override
-  public TableStats getInputStats() {
-    return tableStats;
-  }
-}