You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:50 UTC

[13/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
deleted file mode 100644
index 7802c91..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
-	private int id;
-	private String name;
-	
-	private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
-	public DiskDeviceInfo(int id) {
-		this.id = id;
-	}
-	
-	public int getId() {
-		return id;
-	}
-
-	public String getName() {
-		return name;
-	}
-
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	@Override
-	public String toString() {
-		return id + "," + name;
-	}
-
-	public void addMountPath(DiskMountInfo diskMountInfo) {
-		mountInfos.add(diskMountInfo);
-	}
-
-	public List<DiskMountInfo> getMountInfos() {
-		return mountInfos;
-	}
-
-	public void setMountInfos(List<DiskMountInfo> mountInfos) {
-		this.mountInfos = mountInfos;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
deleted file mode 100644
index 1babf99..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-
-public final class DiskFileScanScheduler extends Thread {
-  private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
-
-	private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
-
-  List<FileScannerV2> fetchingScanners = new ArrayList<FileScannerV2>();
-
-  private int scanConcurrency;
-
-	private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
-
-	private Object requestQueueMonitor = new Object(); // c++ code style
-
-	private StorageManagerV2.StorgaeManagerContext smContext;
-
-	private DiskDeviceInfo diskDeviceInfo;
-
-	private AtomicBoolean stopped = new AtomicBoolean(false);
-
-  private long totalScanCount = 0;
-
-  private FetchWaitingThread fetchWaitingThread;
-
-  private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
-
-  private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
-
-  private long[] lastReportReadBytes;
-
-  private long lastReportTime = 0;
-
-	public DiskFileScanScheduler(
-			StorageManagerV2.StorgaeManagerContext smContext,
-			DiskDeviceInfo diskDeviceInfo) {
-		super("DiskFileScanner:" + diskDeviceInfo);
-		this.smContext = smContext;
-		this.diskDeviceInfo = diskDeviceInfo;
-		initScannerPool();
-		this.fetchWaitingThread = new FetchWaitingThread();
-		this.fetchWaitingThread.start();
-	}
-
-  public void incrementReadBytes(long[] readBytes) {
-    totalReadBytesForFetch.addAndGet(readBytes[0]);
-    totalReadBytesFromDisk.addAndGet(readBytes[1]);
-  }
-
-  public int getDiskId() {
-    return diskDeviceInfo.getId();
-  }
-
-  public void run() {
-    synchronized (requestQueueMonitor) {
-      while(!stopped.get()) {
-        if(isAllScannerRunning()) {
-          try {
-            requestQueueMonitor.wait(2000);
-            continue;
-          } catch (InterruptedException e) {
-            break;
-          }
-        } else {
-          FileScannerV2 fileScanner = requestQueue.poll();
-          if(fileScanner == null) {
-            try {
-              requestQueueMonitor.wait(2000);
-              continue;
-            } catch (InterruptedException e) {
-              break;
-            }
-          }
-          if(fileScanner.isStopScanScheduling()) {
-            LOG.info("Exit from Disk Queue:" + fileScanner.getId());
-            continue;
-          }
-          if(fileScanner.isFetchProcessing()) {
-            synchronized(fetchingScanners) {
-              fetchingScanners.add(fileScanner);
-              //fetchingScanners.notifyAll();
-            }
-          } else {
-            numOfRunningScanners.incrementAndGet();
-            FileScanRunner fileScanRunner = new FileScanRunner(
-                DiskFileScanScheduler.this, smContext,
-                fileScanner, requestQueueMonitor,
-                numOfRunningScanners);
-            totalScanCount++;
-            fileScanRunner.start();
-          }
-        }
-      }
-    }
-  }
-
-	protected void requestScanFile(FileScannerV2 fileScannerV2) {
-		synchronized (requestQueueMonitor) {
-			requestQueue.offer(fileScannerV2);
-			requestQueueMonitor.notifyAll();
-		}
-	}
-
-  public class FetchWaitingThread extends Thread {
-    List<FileScannerV2> workList = new ArrayList<FileScannerV2>(20);
-    public void run() {
-      while(!stopped.get()) {
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          break;
-        }
-        workList.clear();
-        synchronized(fetchingScanners) {
-          workList.addAll(fetchingScanners);
-          fetchingScanners.clear();
-        }
-        synchronized(requestQueueMonitor) {
-          for(FileScannerV2 eachScanner: workList) {
-            requestQueue.offer(eachScanner);
-          }
-          requestQueueMonitor.notifyAll();
-        }
-      }
-    }
-  }
-
-	private void initScannerPool() {
-		// TODO finally implements heuristic, currently set with property
-		scanConcurrency = smContext.getConf().getIntVar(ConfVars.STORAGE_MANAGER_CONCURRENCY_PER_DISK);
-	}
-
-  public int getTotalQueueSize() {
-      return requestQueue.size();
-  }
-
-  boolean isAllScannerRunning() {
-    return numOfRunningScanners.get() >= scanConcurrency;
-  }
-
-  public long getTotalScanCount() {
-    return totalScanCount;
-  }
-
-	public void stopScan() {
-		stopped.set(true);
-		if (fetchWaitingThread != null) {
-      fetchWaitingThread.interrupt();
-		}
-
-		this.interrupt();
-	}
-
-  public void printDiskSchedulerInfo() {
-    long currentReadBytes[] = new long[]{totalReadBytesForFetch.get(), totalReadBytesFromDisk.get()};
-    int[] throughput = new int[2];
-    if(lastReportTime != 0 && lastReportReadBytes != null) {
-      int sec = (int)((System.currentTimeMillis() - lastReportTime)/1000);
-      throughput[0] = (int)((currentReadBytes[0] - lastReportReadBytes[0])/sec);
-      throughput[1] = (int)((currentReadBytes[1] - lastReportReadBytes[1])/sec);
-    }
-    lastReportTime = System.currentTimeMillis();
-
-    LOG.info("===>" + DiskFileScanScheduler.this.diskDeviceInfo
-        + ", request=" + requestQueue.size()
-        + ", fetching=" + fetchingScanners.size()
-        + ", running=" + numOfRunningScanners.get()
-        + ", totalScan=" + totalScanCount
-        + ", FetchThroughput=" + throughput[0]/1024 + "KB"
-        + ", DiskScanThroughput=" + throughput[1]/1024 + "KB");
-
-    lastReportReadBytes = currentReadBytes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
deleted file mode 100644
index d71154c..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-public class DiskInfo {
-	private int id;
-	private String partitionName;
-	private String mountPath;
-	
-	private long capacity;
-	private long used;
-	
-	public DiskInfo(int id, String partitionName) {
-		this.id = id;
-		this.partitionName = partitionName;
-	}
-
-	public int getId() {
-		return id;
-	}
-
-	public void setId(int id) {
-		this.id = id;
-	}
-
-	public String getPartitionName() {
-		return partitionName;
-	}
-
-	public void setPartitionName(String partitionName) {
-		this.partitionName = partitionName;
-	}
-
-	public String getMountPath() {
-		return mountPath;
-	}
-
-	public void setMountPath(String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public long getCapacity() {
-		return capacity;
-	}
-
-	public void setCapacity(long capacity) {
-		this.capacity = capacity;
-	}
-
-	public long getUsed() {
-		return used;
-	}
-
-	public void setUsed(long used) {
-		this.used = used;
-	}
-	
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
deleted file mode 100644
index 56100f2..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
-	private String mountPath;
-	
-	private long capacity;
-	private long used;
-	
-	private int deviceId;
-	
-	public DiskMountInfo(int deviceId, String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public String getMountPath() {
-		return mountPath;
-	}
-
-	public void setMountPath(String mountPath) {
-		this.mountPath = mountPath;
-	}
-
-	public long getCapacity() {
-		return capacity;
-	}
-
-	public void setCapacity(long capacity) {
-		this.capacity = capacity;
-	}
-
-	public long getUsed() {
-		return used;
-	}
-
-	public void setUsed(long used) {
-		this.used = used;
-	}
-
-	public int getDeviceId() {
-		return deviceId;
-	}
-
-  @Override
-  public boolean equals(Object obj){
-    if (!(obj instanceof DiskMountInfo)) return false;
-
-    if (compareTo((DiskMountInfo) obj) == 0) return true;
-    else return false;
-  }
-
-  @Override
-  public int hashCode(){
-    return Objects.hashCode(mountPath);
-  }
-
-	@Override
-	public int compareTo(DiskMountInfo other) {
-		String path1 = mountPath;
-		String path2 = other.mountPath;
-		
-		int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
-		int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-		
-		if(path1Depth > path2Depth) {
-			return -1;
-		} else if(path1Depth < path2Depth) {
-			return 1;
-		} else {
-			int path1Length = path1.length();
-			int path2Length = path2.length();
-			
-			if(path1Length < path2Length) {
-				return 1;
-			} else if(path1Length > path2Length) {
-				return -1;
-			} else {
-				return path1.compareTo(path2);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
deleted file mode 100644
index bb90c39..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class DiskUtil {
-
-  static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
-  public enum OSType {
-		OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
-	}
-
-	static private OSType getOSType() {
-		String osName = System.getProperty("os.name");
-		if (osName.contains("Windows")
-				&& (osName.contains("XP") || osName.contains("2003")
-						|| osName.contains("Vista")
-						|| osName.contains("Windows_7")
-						|| osName.contains("Windows 7") || osName
-							.contains("Windows7"))) {
-			return OSType.OS_TYPE_WINXP;
-		} else if (osName.contains("SunOS") || osName.contains("Solaris")) {
-			return OSType.OS_TYPE_SOLARIS;
-		} else if (osName.contains("Mac")) {
-			return OSType.OS_TYPE_MAC;
-		} else {
-			return OSType.OS_TYPE_UNIX;
-		}
-	}
-	
-	public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
-		List<DiskDeviceInfo> deviceInfos;
-		
-		if(getOSType() == OSType.OS_TYPE_UNIX) {
-			deviceInfos = getUnixDiskDeviceInfos();
-			setDeviceMountInfo(deviceInfos);
-		} else {
-			deviceInfos = getDefaultDiskDeviceInfos();
-		}
-		
-		return deviceInfos;
-	}
-
-	private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
-		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-		
-		File file = new File(UNIX_DISK_DEVICE_PATH);
-		if(!file.exists()) {
-			System.out.println("No partition file:" + file.getAbsolutePath());
-			return getDefaultDiskDeviceInfos();
-		}
-		
-		BufferedReader reader = null;
-		try {
-			reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
-			String line = null;
-			
-			int count = 0;
-			Set<String> deviceNames = new TreeSet<String>();
-			while((line = reader.readLine()) != null) {
-				if(count > 0 && !line.trim().isEmpty()) {
-					String[] tokens = line.trim().split(" +");
-					if(tokens.length == 4) {
-						String deviceName = getDiskDeviceName(tokens[3]);
-						deviceNames.add(deviceName);
-					}
-				}
-				count++;
-			}
-			
-			int id = 0;
-			for(String eachDeviceName: deviceNames) {
-				DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
-				diskDeviceInfo.setName(eachDeviceName);
-				
-				//TODO set addtional info
-				// /sys/block/sda/queue
-				infos.add(diskDeviceInfo);
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if(reader != null) {
-				try {
-					reader.close();
-				} catch (IOException e) {
-				}
-			}
-		}
-		
-		return infos;
-	}
-	
-	private static String getDiskDeviceName(String partitionName) {
-		byte[] bytes = partitionName.getBytes();
-		
-		byte[] result = new byte[bytes.length];
-		int length = 0;
-		for(int i = 0; i < bytes.length; i++, length++) {
-			if(bytes[i] >= '0' && bytes[i] <= '9') {
-				break;
-			} else {
-				result[i] = bytes[i];
-			}
-		}
-		
-		return new String(result, 0, length);
-	}
-	
-	private static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
-		DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
-		diskDeviceInfo.setName("default");
-		
-		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-		
-		infos.add(diskDeviceInfo);
-		
-		return infos;
-	}
-	
-	
-	private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
-		Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
-		for(DiskDeviceInfo eachDevice: deviceInfos) {
-			deviceMap.put(eachDevice.getName(), eachDevice);
-		}
-		
-		BufferedReader mountOutput = null;
-		try {
-			Process mountProcess = Runtime.getRuntime().exec("mount");
-			mountOutput = new BufferedReader(new InputStreamReader(
-					mountProcess.getInputStream()));
-			while (true) {
-				String line = mountOutput.readLine();
-				if (line == null) {
-					break;
-				}
-
-				int indexStart = line.indexOf(" on /");
-				int indexEnd = line.indexOf(" ", indexStart + 4);
-
-				String deviceName = line.substring(0, indexStart).trim();
-				String[] deviceNameTokens = deviceName.split("/");
-				if(deviceNameTokens.length == 3) {
-					if("dev".equals(deviceNameTokens[1])) {
-						String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
-						String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-						
-						DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
-						if(diskDeviceInfo != null) {
-							diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
-						}
-					}
-				}
-			}
-		} catch (IOException e) {
-			throw e;
-		} finally {
-			if (mountOutput != null) {
-				mountOutput.close();
-			}
-		}
-	}
-	
-	public static void main(String[] args) throws Exception {
-		System.out.println("/dev/sde1".split("/").length);
-		for(String eachToken: "/dev/sde1".split("/")) {
-			System.out.println(eachToken);
-		}
- 	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
deleted file mode 100644
index 07fbe6c..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileScanRunner extends Thread {
-  private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
-
-  StorageManagerV2.StorgaeManagerContext smContext;
-	FileScannerV2 fileScanner;
-	Object requestQueueMonitor;
-	AtomicInteger numOfRunningScanners;
-	DiskFileScanScheduler diskFileScanScheduler;
-	
-	int maxReadBytes;
-	
-	public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler, 
-			StorageManagerV2.StorgaeManagerContext smContext,
-      FileScannerV2 fileScanner, Object requestQueueMonitor,
-			AtomicInteger numOfRunningScanners) {
-		super("FileScanRunner:" + fileScanner.getId());
-		this.diskFileScanScheduler = diskFileScanScheduler;
-		this.fileScanner = fileScanner;
-		this.smContext = smContext;
-		this.requestQueueMonitor = requestQueueMonitor;
-		this.numOfRunningScanners = numOfRunningScanners;
-		
-		this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
-	}
-
-	public void run() {
-    try {
-//      long startTime = System.currentTimeMillis();
-//      boolean fetching = fileScanner.isFetchProcessing();
-      fileScanner.scan(maxReadBytes);
-//      if(diskFileScanScheduler.getDiskId() == 1) {
-//        LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
-//            ",fetching=" + fetching +
-//            ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
-//      }
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    } finally {
-      synchronized(requestQueueMonitor) {
-        numOfRunningScanners.decrementAndGet();
-        requestQueueMonitor.notifyAll();
-      }
-    }
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
deleted file mode 100644
index 0d5b33d..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class FileScannerV2 implements Scanner {
-  private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
-
-	protected AtomicBoolean closed = new AtomicBoolean(false);
-
-	protected FileSystem fs;
-
-  protected boolean inited = false;
-  protected final Configuration conf;
-  protected final TableMeta meta;
-  protected final Schema schema;
-  protected final FileFragment fragment;
-  protected final int columnNum;
-  protected Column[] targets;
-  protected long totalScanTime = 0;
-  protected int allocatedDiskId;
-
-  protected StorageManagerV2.StorgaeManagerContext smContext;
-
-  protected AtomicBoolean firstSchdeuled = new AtomicBoolean(true);
-
-  protected abstract boolean scanNext(int length) throws IOException;
-
-  protected abstract boolean initFirstScan(int maxBytesPerSchedule) throws IOException;
-
-  protected abstract long getFilePosition() throws IOException;
-
-  protected abstract Tuple nextTuple() throws IOException;
-
-  public abstract boolean isFetchProcessing();
-
-  public abstract boolean isStopScanScheduling();
-
-  public abstract void scannerReset();
-
-  protected abstract long[] reportReadBytes();
-
-	public FileScannerV2(final Configuration conf,
-                       final TableMeta meta,
-                       final Schema schema,
-                       final FileFragment fragment) throws IOException {
-    this.conf = conf;
-    this.meta = meta;
-    this.schema = schema;
-    this.fragment = fragment;
-    this.columnNum = this.schema.getColumnNum();
-
-    this.fs = fragment.getPath().getFileSystem(conf);
-	}
-
-  public void init() throws IOException {
-    closed.set(false);
-    firstSchdeuled.set(true);
-
-    if(!inited) {
-      smContext.requestFileScan(this);
-    }
-    inited = true;
-  }
-
-  @Override
-  public void reset() throws IOException {
-    scannerReset();
-    close();
-    inited = false;
-    init();
-  }
-
-  public void setAllocatedDiskId(int allocatedDiskId) {
-    this.allocatedDiskId = allocatedDiskId;
-  }
-
-  public String getId() {
-    return fragment.getPath().getName() + ":" + fragment.getStartKey() + ":" +
-        fragment.getEndKey() + "_" + System.currentTimeMillis();
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-    this.targets = targets;
-  }
-
-  public Path getPath() {
-    return fragment.getPath();
-  }
-
-  public int getDiskId() {
-    if(fragment.getDiskIds().length <= 0) {
-      //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartKey());
-      return -1;
-    } else {
-      return fragment.getDiskIds()[0];
-    }
-  }
-
-  public void setSearchCondition(Object expr) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-  }
-
-  public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
-    this.smContext = context;
-  }
-
-  public String toString() {
-    return fragment.getPath() + ":" + fragment.getStartKey();
-  }
-
-  public void scan(int maxBytesPerSchedule) throws IOException {
-    long startTime = System.currentTimeMillis();
-    try {
-    synchronized(firstSchdeuled) {
-      if(firstSchdeuled.get()) {
-        boolean moreData = initFirstScan(maxBytesPerSchedule);
-        firstSchdeuled.set(false);
-        firstSchdeuled.notifyAll();
-        if(moreData) {
-          smContext.requestFileScan(this);
-        }
-        return;
-      }
-    }
-    boolean moreData = scanNext(maxBytesPerSchedule);
-
-    if(moreData) {
-      smContext.requestFileScan(this);
-    }
-    } finally {
-      totalScanTime += System.currentTimeMillis() - startTime;
-    }
-  }
-
-  @Override
-  public void close() throws IOException {
-    if(closed.get()) {
-      return;
-    }
-    long[] readBytes = reportReadBytes();
-    smContext.incrementReadBytes(allocatedDiskId, readBytes);
-    closed.set(true);
-    LOG.info(toString() + " closed, totalScanTime=" + totalScanTime);
-  }
-
-  public boolean isClosed() {
-    return closed.get();
-  }
-
-  public Tuple next() throws IOException {
-    synchronized(firstSchdeuled) {
-      if(firstSchdeuled.get()) {
-        try {
-          firstSchdeuled.wait();
-        } catch (InterruptedException e) {
-        }
-      }
-    }
-    return nextTuple();
-  }
-}