You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2014/01/28 13:35:50 UTC
[13/18] TAJO-520: Move tajo-core-storage to tajo-storage. (jinho)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
deleted file mode 100644
index 7802c91..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
- private int id;
- private String name;
-
- private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
- public DiskDeviceInfo(int id) {
- this.id = id;
- }
-
- public int getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return id + "," + name;
- }
-
- public void addMountPath(DiskMountInfo diskMountInfo) {
- mountInfos.add(diskMountInfo);
- }
-
- public List<DiskMountInfo> getMountInfos() {
- return mountInfos;
- }
-
- public void setMountInfos(List<DiskMountInfo> mountInfos) {
- this.mountInfos = mountInfos;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
deleted file mode 100644
index 1babf99..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-
-public final class DiskFileScanScheduler extends Thread {
- private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
-
- private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
-
- List<FileScannerV2> fetchingScanners = new ArrayList<FileScannerV2>();
-
- private int scanConcurrency;
-
- private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
-
- private Object requestQueueMonitor = new Object(); // c++ code style
-
- private StorageManagerV2.StorgaeManagerContext smContext;
-
- private DiskDeviceInfo diskDeviceInfo;
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private long totalScanCount = 0;
-
- private FetchWaitingThread fetchWaitingThread;
-
- private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
-
- private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
-
- private long[] lastReportReadBytes;
-
- private long lastReportTime = 0;
-
- public DiskFileScanScheduler(
- StorageManagerV2.StorgaeManagerContext smContext,
- DiskDeviceInfo diskDeviceInfo) {
- super("DiskFileScanner:" + diskDeviceInfo);
- this.smContext = smContext;
- this.diskDeviceInfo = diskDeviceInfo;
- initScannerPool();
- this.fetchWaitingThread = new FetchWaitingThread();
- this.fetchWaitingThread.start();
- }
-
- public void incrementReadBytes(long[] readBytes) {
- totalReadBytesForFetch.addAndGet(readBytes[0]);
- totalReadBytesFromDisk.addAndGet(readBytes[1]);
- }
-
- public int getDiskId() {
- return diskDeviceInfo.getId();
- }
-
- public void run() {
- synchronized (requestQueueMonitor) {
- while(!stopped.get()) {
- if(isAllScannerRunning()) {
- try {
- requestQueueMonitor.wait(2000);
- continue;
- } catch (InterruptedException e) {
- break;
- }
- } else {
- FileScannerV2 fileScanner = requestQueue.poll();
- if(fileScanner == null) {
- try {
- requestQueueMonitor.wait(2000);
- continue;
- } catch (InterruptedException e) {
- break;
- }
- }
- if(fileScanner.isStopScanScheduling()) {
- LOG.info("Exit from Disk Queue:" + fileScanner.getId());
- continue;
- }
- if(fileScanner.isFetchProcessing()) {
- synchronized(fetchingScanners) {
- fetchingScanners.add(fileScanner);
- //fetchingScanners.notifyAll();
- }
- } else {
- numOfRunningScanners.incrementAndGet();
- FileScanRunner fileScanRunner = new FileScanRunner(
- DiskFileScanScheduler.this, smContext,
- fileScanner, requestQueueMonitor,
- numOfRunningScanners);
- totalScanCount++;
- fileScanRunner.start();
- }
- }
- }
- }
- }
-
- protected void requestScanFile(FileScannerV2 fileScannerV2) {
- synchronized (requestQueueMonitor) {
- requestQueue.offer(fileScannerV2);
- requestQueueMonitor.notifyAll();
- }
- }
-
- public class FetchWaitingThread extends Thread {
- List<FileScannerV2> workList = new ArrayList<FileScannerV2>(20);
- public void run() {
- while(!stopped.get()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- break;
- }
- workList.clear();
- synchronized(fetchingScanners) {
- workList.addAll(fetchingScanners);
- fetchingScanners.clear();
- }
- synchronized(requestQueueMonitor) {
- for(FileScannerV2 eachScanner: workList) {
- requestQueue.offer(eachScanner);
- }
- requestQueueMonitor.notifyAll();
- }
- }
- }
- }
-
- private void initScannerPool() {
- // TODO finally implements heuristic, currently set with property
- scanConcurrency = smContext.getConf().getIntVar(ConfVars.STORAGE_MANAGER_CONCURRENCY_PER_DISK);
- }
-
- public int getTotalQueueSize() {
- return requestQueue.size();
- }
-
- boolean isAllScannerRunning() {
- return numOfRunningScanners.get() >= scanConcurrency;
- }
-
- public long getTotalScanCount() {
- return totalScanCount;
- }
-
- public void stopScan() {
- stopped.set(true);
- if (fetchWaitingThread != null) {
- fetchWaitingThread.interrupt();
- }
-
- this.interrupt();
- }
-
- public void printDiskSchedulerInfo() {
- long currentReadBytes[] = new long[]{totalReadBytesForFetch.get(), totalReadBytesFromDisk.get()};
- int[] throughput = new int[2];
- if(lastReportTime != 0 && lastReportReadBytes != null) {
- int sec = (int)((System.currentTimeMillis() - lastReportTime)/1000);
- throughput[0] = (int)((currentReadBytes[0] - lastReportReadBytes[0])/sec);
- throughput[1] = (int)((currentReadBytes[1] - lastReportReadBytes[1])/sec);
- }
- lastReportTime = System.currentTimeMillis();
-
- LOG.info("===>" + DiskFileScanScheduler.this.diskDeviceInfo
- + ", request=" + requestQueue.size()
- + ", fetching=" + fetchingScanners.size()
- + ", running=" + numOfRunningScanners.get()
- + ", totalScan=" + totalScanCount
- + ", FetchThroughput=" + throughput[0]/1024 + "KB"
- + ", DiskScanThroughput=" + throughput[1]/1024 + "KB");
-
- lastReportReadBytes = currentReadBytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
deleted file mode 100644
index d71154c..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-public class DiskInfo {
- private int id;
- private String partitionName;
- private String mountPath;
-
- private long capacity;
- private long used;
-
- public DiskInfo(int id, String partitionName) {
- this.id = id;
- this.partitionName = partitionName;
- }
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getPartitionName() {
- return partitionName;
- }
-
- public void setPartitionName(String partitionName) {
- this.partitionName = partitionName;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
deleted file mode 100644
index 56100f2..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
- private String mountPath;
-
- private long capacity;
- private long used;
-
- private int deviceId;
-
- public DiskMountInfo(int deviceId, String mountPath) {
- this.mountPath = mountPath;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
- public int getDeviceId() {
- return deviceId;
- }
-
- @Override
- public boolean equals(Object obj){
- if (!(obj instanceof DiskMountInfo)) return false;
-
- if (compareTo((DiskMountInfo) obj) == 0) return true;
- else return false;
- }
-
- @Override
- public int hashCode(){
- return Objects.hashCode(mountPath);
- }
-
- @Override
- public int compareTo(DiskMountInfo other) {
- String path1 = mountPath;
- String path2 = other.mountPath;
-
- int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
- int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-
- if(path1Depth > path2Depth) {
- return -1;
- } else if(path1Depth < path2Depth) {
- return 1;
- } else {
- int path1Length = path1.length();
- int path2Length = path2.length();
-
- if(path1Length < path2Length) {
- return 1;
- } else if(path1Length > path2Length) {
- return -1;
- } else {
- return path1.compareTo(path2);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
deleted file mode 100644
index bb90c39..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class DiskUtil {
-
- static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
- public enum OSType {
- OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
- }
-
- static private OSType getOSType() {
- String osName = System.getProperty("os.name");
- if (osName.contains("Windows")
- && (osName.contains("XP") || osName.contains("2003")
- || osName.contains("Vista")
- || osName.contains("Windows_7")
- || osName.contains("Windows 7") || osName
- .contains("Windows7"))) {
- return OSType.OS_TYPE_WINXP;
- } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
- return OSType.OS_TYPE_SOLARIS;
- } else if (osName.contains("Mac")) {
- return OSType.OS_TYPE_MAC;
- } else {
- return OSType.OS_TYPE_UNIX;
- }
- }
-
- public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
- List<DiskDeviceInfo> deviceInfos;
-
- if(getOSType() == OSType.OS_TYPE_UNIX) {
- deviceInfos = getUnixDiskDeviceInfos();
- setDeviceMountInfo(deviceInfos);
- } else {
- deviceInfos = getDefaultDiskDeviceInfos();
- }
-
- return deviceInfos;
- }
-
- private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- File file = new File(UNIX_DISK_DEVICE_PATH);
- if(!file.exists()) {
- System.out.println("No partition file:" + file.getAbsolutePath());
- return getDefaultDiskDeviceInfos();
- }
-
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
- String line = null;
-
- int count = 0;
- Set<String> deviceNames = new TreeSet<String>();
- while((line = reader.readLine()) != null) {
- if(count > 0 && !line.trim().isEmpty()) {
- String[] tokens = line.trim().split(" +");
- if(tokens.length == 4) {
- String deviceName = getDiskDeviceName(tokens[3]);
- deviceNames.add(deviceName);
- }
- }
- count++;
- }
-
- int id = 0;
- for(String eachDeviceName: deviceNames) {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
- diskDeviceInfo.setName(eachDeviceName);
-
- //TODO set addtional info
- // /sys/block/sda/queue
- infos.add(diskDeviceInfo);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- }
- }
- }
-
- return infos;
- }
-
- private static String getDiskDeviceName(String partitionName) {
- byte[] bytes = partitionName.getBytes();
-
- byte[] result = new byte[bytes.length];
- int length = 0;
- for(int i = 0; i < bytes.length; i++, length++) {
- if(bytes[i] >= '0' && bytes[i] <= '9') {
- break;
- } else {
- result[i] = bytes[i];
- }
- }
-
- return new String(result, 0, length);
- }
-
- private static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
- diskDeviceInfo.setName("default");
-
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- infos.add(diskDeviceInfo);
-
- return infos;
- }
-
-
- private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
- Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
- for(DiskDeviceInfo eachDevice: deviceInfos) {
- deviceMap.put(eachDevice.getName(), eachDevice);
- }
-
- BufferedReader mountOutput = null;
- try {
- Process mountProcess = Runtime.getRuntime().exec("mount");
- mountOutput = new BufferedReader(new InputStreamReader(
- mountProcess.getInputStream()));
- while (true) {
- String line = mountOutput.readLine();
- if (line == null) {
- break;
- }
-
- int indexStart = line.indexOf(" on /");
- int indexEnd = line.indexOf(" ", indexStart + 4);
-
- String deviceName = line.substring(0, indexStart).trim();
- String[] deviceNameTokens = deviceName.split("/");
- if(deviceNameTokens.length == 3) {
- if("dev".equals(deviceNameTokens[1])) {
- String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
- String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-
- DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
- if(diskDeviceInfo != null) {
- diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
- }
- }
- }
- }
- } catch (IOException e) {
- throw e;
- } finally {
- if (mountOutput != null) {
- mountOutput.close();
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- System.out.println("/dev/sde1".split("/").length);
- for(String eachToken: "/dev/sde1".split("/")) {
- System.out.println(eachToken);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
deleted file mode 100644
index 07fbe6c..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileScanRunner extends Thread {
- private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
-
- StorageManagerV2.StorgaeManagerContext smContext;
- FileScannerV2 fileScanner;
- Object requestQueueMonitor;
- AtomicInteger numOfRunningScanners;
- DiskFileScanScheduler diskFileScanScheduler;
-
- int maxReadBytes;
-
- public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler,
- StorageManagerV2.StorgaeManagerContext smContext,
- FileScannerV2 fileScanner, Object requestQueueMonitor,
- AtomicInteger numOfRunningScanners) {
- super("FileScanRunner:" + fileScanner.getId());
- this.diskFileScanScheduler = diskFileScanScheduler;
- this.fileScanner = fileScanner;
- this.smContext = smContext;
- this.requestQueueMonitor = requestQueueMonitor;
- this.numOfRunningScanners = numOfRunningScanners;
-
- this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
- }
-
- public void run() {
- try {
-// long startTime = System.currentTimeMillis();
-// boolean fetching = fileScanner.isFetchProcessing();
- fileScanner.scan(maxReadBytes);
-// if(diskFileScanScheduler.getDiskId() == 1) {
-// LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
-// ",fetching=" + fetching +
-// ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
-// }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- synchronized(requestQueueMonitor) {
- numOfRunningScanners.decrementAndGet();
- requestQueueMonitor.notifyAll();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bbf9b7bf/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
deleted file mode 100644
index 0d5b33d..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class FileScannerV2 implements Scanner {
- private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
-
- protected AtomicBoolean closed = new AtomicBoolean(false);
-
- protected FileSystem fs;
-
- protected boolean inited = false;
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final FileFragment fragment;
- protected final int columnNum;
- protected Column[] targets;
- protected long totalScanTime = 0;
- protected int allocatedDiskId;
-
- protected StorageManagerV2.StorgaeManagerContext smContext;
-
- protected AtomicBoolean firstSchdeuled = new AtomicBoolean(true);
-
- protected abstract boolean scanNext(int length) throws IOException;
-
- protected abstract boolean initFirstScan(int maxBytesPerSchedule) throws IOException;
-
- protected abstract long getFilePosition() throws IOException;
-
- protected abstract Tuple nextTuple() throws IOException;
-
- public abstract boolean isFetchProcessing();
-
- public abstract boolean isStopScanScheduling();
-
- public abstract void scannerReset();
-
- protected abstract long[] reportReadBytes();
-
- public FileScannerV2(final Configuration conf,
- final TableMeta meta,
- final Schema schema,
- final FileFragment fragment) throws IOException {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.fragment = fragment;
- this.columnNum = this.schema.getColumnNum();
-
- this.fs = fragment.getPath().getFileSystem(conf);
- }
-
- public void init() throws IOException {
- closed.set(false);
- firstSchdeuled.set(true);
-
- if(!inited) {
- smContext.requestFileScan(this);
- }
- inited = true;
- }
-
- @Override
- public void reset() throws IOException {
- scannerReset();
- close();
- inited = false;
- init();
- }
-
- public void setAllocatedDiskId(int allocatedDiskId) {
- this.allocatedDiskId = allocatedDiskId;
- }
-
- public String getId() {
- return fragment.getPath().getName() + ":" + fragment.getStartKey() + ":" +
- fragment.getEndKey() + "_" + System.currentTimeMillis();
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- this.targets = targets;
- }
-
- public Path getPath() {
- return fragment.getPath();
- }
-
- public int getDiskId() {
- if(fragment.getDiskIds().length <= 0) {
- //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartKey());
- return -1;
- } else {
- return fragment.getDiskIds()[0];
- }
- }
-
- public void setSearchCondition(Object expr) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- }
-
- public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
- this.smContext = context;
- }
-
- public String toString() {
- return fragment.getPath() + ":" + fragment.getStartKey();
- }
-
- public void scan(int maxBytesPerSchedule) throws IOException {
- long startTime = System.currentTimeMillis();
- try {
- synchronized(firstSchdeuled) {
- if(firstSchdeuled.get()) {
- boolean moreData = initFirstScan(maxBytesPerSchedule);
- firstSchdeuled.set(false);
- firstSchdeuled.notifyAll();
- if(moreData) {
- smContext.requestFileScan(this);
- }
- return;
- }
- }
- boolean moreData = scanNext(maxBytesPerSchedule);
-
- if(moreData) {
- smContext.requestFileScan(this);
- }
- } finally {
- totalScanTime += System.currentTimeMillis() - startTime;
- }
- }
-
- @Override
- public void close() throws IOException {
- if(closed.get()) {
- return;
- }
- long[] readBytes = reportReadBytes();
- smContext.incrementReadBytes(allocatedDiskId, readBytes);
- closed.set(true);
- LOG.info(toString() + " closed, totalScanTime=" + totalScanTime);
- }
-
- public boolean isClosed() {
- return closed.get();
- }
-
- public Tuple next() throws IOException {
- synchronized(firstSchdeuled) {
- if(firstSchdeuled.get()) {
- try {
- firstSchdeuled.wait();
- } catch (InterruptedException e) {
- }
- }
- }
- return nextTuple();
- }
-}