You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/23 13:37:33 UTC
[5/8] TAJO-1121: Remove the 'v2' storage package.
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
deleted file mode 100644
index e15ca6e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.io.compress.*;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.LazyTuple;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.compress.CodecPool;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.BytesUtils;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-public class CSVFileScanner extends FileScannerV2 {
- public static final String DELIMITER = "csvfile.delimiter";
- public static final String DELIMITER_DEFAULT = "|";
- public static final byte LF = '\n';
- private static final Log LOG = LogFactory.getLog(CSVFileScanner.class);
-
- private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
- private int bufSize;
- private char delimiter;
- private ScheduledInputStream sin;
- private InputStream is; // decompressd stream
- private CompressionCodecFactory factory;
- private CompressionCodec codec;
- private Decompressor decompressor;
- private Seekable filePosition;
- private boolean splittable = true;
- private long startOffset, length;
- private byte[] buf = null;
- private byte[][] tuples = null;
- private long[] tupleOffsets = null;
- private int currentIdx = 0, validIdx = 0;
- private byte[] tail = null;
- private long pageStart = -1;
- private long prevTailLen = -1;
- private int[] targetColumnIndexes;
- private boolean eof = false;
- private boolean first = true;
-
- private long totalReadBytesForFetch;
- private long totalReadBytesFromDisk;
-
- public CSVFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
- throws IOException {
- super(conf, meta, schema, fragment);
- factory = new CompressionCodecFactory(conf);
- codec = factory.getCodec(this.fragment.getPath());
- if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
- splittable = false;
- }
- }
-
- @Override
- public void init() throws IOException {
- // Buffer size, Delimiter
- this.bufSize = DEFAULT_BUFFER_SIZE;
- String delim = meta.getOption(DELIMITER, DELIMITER_DEFAULT);
- this.delimiter = delim.charAt(0);
-
- super.init();
- }
-
- @Override
- protected boolean initFirstScan(int maxBytesPerSchedule) throws IOException {
- synchronized(this) {
- eof = false;
- first = true;
- if(sin == null) {
- FSDataInputStream fin = fs.open(fragment.getPath(), 128 * 1024);
- sin = new ScheduledInputStream(fragment.getPath(), fin,
- fragment.getStartKey(), fragment.getEndKey(), fs.getLength(fragment.getPath()));
- startOffset = fragment.getStartKey();
- length = fragment.getEndKey();
-
- if (startOffset > 0) {
- startOffset--; // prev line feed
- }
- }
- }
- return true;
- }
-
- private boolean scanFirst() throws IOException {
- if (codec != null) {
- decompressor = CodecPool.getDecompressor(codec);
- if (codec instanceof SplittableCompressionCodec) {
- SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
- sin, decompressor, startOffset, startOffset + length,
- SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
- startOffset = cIn.getAdjustedStart();
- length = cIn.getAdjustedEnd() - startOffset;
- filePosition = cIn;
- is = cIn;
- } else {
- is = new DataInputStream(codec.createInputStream(sin, decompressor));
- }
- } else {
- sin.seek(startOffset);
- filePosition = sin;
- is = sin;
- }
-
- tuples = new byte[0][];
- if (targets == null) {
- targets = schema.toArray();
- }
-
- targetColumnIndexes = new int[targets.length];
- for (int i = 0; i < targets.length; i++) {
- targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName());
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
- "," + fs.getFileStatus(fragment.getPath()).getLen());
- }
-
- if (startOffset != 0) {
- int rbyte;
- while ((rbyte = is.read()) != LF) {
- if(rbyte == -1) break;
- }
- }
-
- if (fragmentable() < 1) {
- close();
- return false;
- }
- return true;
- }
-
- @Override
- public boolean isStopScanScheduling() {
- if(sin != null && sin.isEndOfStream()) {
- return true;
- } else {
- return false;
- }
- }
-
- private long fragmentable() throws IOException {
- return startOffset + length - getFilePosition();
- }
-
- @Override
- protected long getFilePosition() throws IOException {
- long retVal;
- if (filePosition != null) {
- retVal = filePosition.getPos();
- } else {
- retVal = sin.getPos();
- }
- return retVal;
- }
-
- @Override
- public boolean isFetchProcessing() {
- if(sin != null &&
- (sin.getAvaliableSize() >= 64 * 1024 * 1024)) {
- return true;
- } else {
- return false;
- }
- }
-
- private void page() throws IOException {
- // Index initialization
- currentIdx = 0;
-
- // Buffer size set
- if (isSplittable() && fragmentable() < DEFAULT_BUFFER_SIZE) {
- bufSize = (int) fragmentable();
- }
-
- if (this.tail == null || this.tail.length == 0) {
- this.pageStart = getFilePosition();
- this.prevTailLen = 0;
- } else {
- this.pageStart = getFilePosition() - this.tail.length;
- this.prevTailLen = this.tail.length;
- }
-
- // Read
- int rbyte;
- buf = new byte[bufSize];
- rbyte = is.read(buf);
-
- if (rbyte < 0) {
- eof = true; // EOF
- return;
- }
-
- if (prevTailLen == 0) {
- tail = new byte[0];
- tuples = BytesUtils.splitPreserveAllTokens(buf, rbyte, (char) LF);
- } else {
- byte[] lastRow = ArrayUtils.addAll(tail, buf);
- tuples = BytesUtils.splitPreserveAllTokens(lastRow, rbyte + tail.length, (char) LF);
- tail = null;
- }
-
- // Check tail
- if ((char) buf[rbyte - 1] != LF) {
- if ((fragmentable() < 1 || rbyte != bufSize)) {
- int lineFeedPos = 0;
- byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
-
- // find line feed
- while ((temp[lineFeedPos] = (byte)is.read()) != (byte)LF) {
- if(temp[lineFeedPos] < 0) {
- break;
- }
- lineFeedPos++;
- }
-
- tuples[tuples.length - 1] = ArrayUtils.addAll(tuples[tuples.length - 1],
- ArrayUtils.subarray(temp, 0, lineFeedPos));
- validIdx = tuples.length;
- } else {
- tail = tuples[tuples.length - 1];
- validIdx = tuples.length - 1;
- }
- } else {
- tail = new byte[0];
- validIdx = tuples.length - 1;
- }
-
- if(!isCompress()) makeTupleOffset();
- }
-
- private void makeTupleOffset() {
- long curTupleOffset = 0;
- this.tupleOffsets = new long[this.validIdx];
- for (int i = 0; i < this.validIdx; i++) {
- this.tupleOffsets[i] = curTupleOffset + this.pageStart;
- curTupleOffset += this.tuples[i].length + 1;//tuple byte + 1byte line feed
- }
- }
-
- protected Tuple nextTuple() throws IOException {
- if(first) {
- boolean more = scanFirst();
- first = false;
- if(!more) {
- return null;
- }
- }
- try {
- if (currentIdx == validIdx) {
- if (isSplittable() && fragmentable() < 1) {
- close();
- return null;
- } else {
- page();
- }
-
- if(eof){
- close();
- return null;
- }
- }
-
- long offset = -1;
- if(!isCompress()){
- offset = this.tupleOffsets[currentIdx];
- }
-
- byte[][] cells = BytesUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter, targetColumnIndexes);
- return new LazyTuple(schema, cells, offset);
- } catch (Throwable t) {
- LOG.error(t.getMessage(), t);
- }
- return null;
- }
-
- private boolean isCompress() {
- return codec != null;
- }
-
- @Override
- public void scannerReset() {
- if(sin != null) {
- try {
- filePosition.seek(0);
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- if(sin != null) {
- try {
- sin.seek(0);
- sin.reset();
- } catch (IOException e) {
- LOG.error(e.getMessage(), e);
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- if(closed.get()) {
- return;
- }
- if(sin != null) {
- totalReadBytesForFetch = sin.getTotalReadBytesForFetch();
- totalReadBytesFromDisk = sin.getTotalReadBytesFromDisk();
- }
- try {
- if(is != null) {
- is.close();
- }
- is = null;
- sin = null;
- } finally {
- if (decompressor != null) {
- CodecPool.returnDecompressor(decompressor);
- decompressor = null;
- }
- tuples = null;
- super.close();
- }
- }
-
- @Override
- protected boolean scanNext(int length) throws IOException {
- synchronized(this) {
- if(isClosed()) {
- return false;
- }
- return sin.readNext(length);
- }
- }
-
- @Override
- public boolean isProjectable() {
- return true;
- }
-
- @Override
- public boolean isSelectable() {
- return false;
- }
-
- @Override
- public void setSearchCondition(Object expr) {
- }
-
- @Override
- public boolean isSplittable(){
- return splittable;
- }
-
- @Override
- protected long[] reportReadBytes() {
- return new long[]{totalReadBytesForFetch, totalReadBytesFromDisk};
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
deleted file mode 100644
index 7802c91..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class DiskDeviceInfo {
- private int id;
- private String name;
-
- private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
-
- public DiskDeviceInfo(int id) {
- this.id = id;
- }
-
- public int getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return id + "," + name;
- }
-
- public void addMountPath(DiskMountInfo diskMountInfo) {
- mountInfos.add(diskMountInfo);
- }
-
- public List<DiskMountInfo> getMountInfos() {
- return mountInfos;
- }
-
- public void setMountInfos(List<DiskMountInfo> mountInfos) {
- this.mountInfos = mountInfos;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
deleted file mode 100644
index 1babf99..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-
-public final class DiskFileScanScheduler extends Thread {
- private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
-
- private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
-
- List<FileScannerV2> fetchingScanners = new ArrayList<FileScannerV2>();
-
- private int scanConcurrency;
-
- private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
-
- private Object requestQueueMonitor = new Object(); // c++ code style
-
- private StorageManagerV2.StorgaeManagerContext smContext;
-
- private DiskDeviceInfo diskDeviceInfo;
-
- private AtomicBoolean stopped = new AtomicBoolean(false);
-
- private long totalScanCount = 0;
-
- private FetchWaitingThread fetchWaitingThread;
-
- private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
-
- private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
-
- private long[] lastReportReadBytes;
-
- private long lastReportTime = 0;
-
- public DiskFileScanScheduler(
- StorageManagerV2.StorgaeManagerContext smContext,
- DiskDeviceInfo diskDeviceInfo) {
- super("DiskFileScanner:" + diskDeviceInfo);
- this.smContext = smContext;
- this.diskDeviceInfo = diskDeviceInfo;
- initScannerPool();
- this.fetchWaitingThread = new FetchWaitingThread();
- this.fetchWaitingThread.start();
- }
-
- public void incrementReadBytes(long[] readBytes) {
- totalReadBytesForFetch.addAndGet(readBytes[0]);
- totalReadBytesFromDisk.addAndGet(readBytes[1]);
- }
-
- public int getDiskId() {
- return diskDeviceInfo.getId();
- }
-
- public void run() {
- synchronized (requestQueueMonitor) {
- while(!stopped.get()) {
- if(isAllScannerRunning()) {
- try {
- requestQueueMonitor.wait(2000);
- continue;
- } catch (InterruptedException e) {
- break;
- }
- } else {
- FileScannerV2 fileScanner = requestQueue.poll();
- if(fileScanner == null) {
- try {
- requestQueueMonitor.wait(2000);
- continue;
- } catch (InterruptedException e) {
- break;
- }
- }
- if(fileScanner.isStopScanScheduling()) {
- LOG.info("Exit from Disk Queue:" + fileScanner.getId());
- continue;
- }
- if(fileScanner.isFetchProcessing()) {
- synchronized(fetchingScanners) {
- fetchingScanners.add(fileScanner);
- //fetchingScanners.notifyAll();
- }
- } else {
- numOfRunningScanners.incrementAndGet();
- FileScanRunner fileScanRunner = new FileScanRunner(
- DiskFileScanScheduler.this, smContext,
- fileScanner, requestQueueMonitor,
- numOfRunningScanners);
- totalScanCount++;
- fileScanRunner.start();
- }
- }
- }
- }
- }
-
- protected void requestScanFile(FileScannerV2 fileScannerV2) {
- synchronized (requestQueueMonitor) {
- requestQueue.offer(fileScannerV2);
- requestQueueMonitor.notifyAll();
- }
- }
-
- public class FetchWaitingThread extends Thread {
- List<FileScannerV2> workList = new ArrayList<FileScannerV2>(20);
- public void run() {
- while(!stopped.get()) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- break;
- }
- workList.clear();
- synchronized(fetchingScanners) {
- workList.addAll(fetchingScanners);
- fetchingScanners.clear();
- }
- synchronized(requestQueueMonitor) {
- for(FileScannerV2 eachScanner: workList) {
- requestQueue.offer(eachScanner);
- }
- requestQueueMonitor.notifyAll();
- }
- }
- }
- }
-
- private void initScannerPool() {
- // TODO finally implements heuristic, currently set with property
- scanConcurrency = smContext.getConf().getIntVar(ConfVars.STORAGE_MANAGER_CONCURRENCY_PER_DISK);
- }
-
- public int getTotalQueueSize() {
- return requestQueue.size();
- }
-
- boolean isAllScannerRunning() {
- return numOfRunningScanners.get() >= scanConcurrency;
- }
-
- public long getTotalScanCount() {
- return totalScanCount;
- }
-
- public void stopScan() {
- stopped.set(true);
- if (fetchWaitingThread != null) {
- fetchWaitingThread.interrupt();
- }
-
- this.interrupt();
- }
-
- public void printDiskSchedulerInfo() {
- long currentReadBytes[] = new long[]{totalReadBytesForFetch.get(), totalReadBytesFromDisk.get()};
- int[] throughput = new int[2];
- if(lastReportTime != 0 && lastReportReadBytes != null) {
- int sec = (int)((System.currentTimeMillis() - lastReportTime)/1000);
- throughput[0] = (int)((currentReadBytes[0] - lastReportReadBytes[0])/sec);
- throughput[1] = (int)((currentReadBytes[1] - lastReportReadBytes[1])/sec);
- }
- lastReportTime = System.currentTimeMillis();
-
- LOG.info("===>" + DiskFileScanScheduler.this.diskDeviceInfo
- + ", request=" + requestQueue.size()
- + ", fetching=" + fetchingScanners.size()
- + ", running=" + numOfRunningScanners.get()
- + ", totalScan=" + totalScanCount
- + ", FetchThroughput=" + throughput[0]/1024 + "KB"
- + ", DiskScanThroughput=" + throughput[1]/1024 + "KB");
-
- lastReportReadBytes = currentReadBytes;
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
deleted file mode 100644
index d71154c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-public class DiskInfo {
- private int id;
- private String partitionName;
- private String mountPath;
-
- private long capacity;
- private long used;
-
- public DiskInfo(int id, String partitionName) {
- this.id = id;
- this.partitionName = partitionName;
- }
-
- public int getId() {
- return id;
- }
-
- public void setId(int id) {
- this.id = id;
- }
-
- public String getPartitionName() {
- return partitionName;
- }
-
- public void setPartitionName(String partitionName) {
- this.partitionName = partitionName;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
deleted file mode 100644
index 56100f2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import com.google.common.base.Objects;
-
-public class DiskMountInfo implements Comparable<DiskMountInfo> {
- private String mountPath;
-
- private long capacity;
- private long used;
-
- private int deviceId;
-
- public DiskMountInfo(int deviceId, String mountPath) {
- this.mountPath = mountPath;
- }
-
- public String getMountPath() {
- return mountPath;
- }
-
- public void setMountPath(String mountPath) {
- this.mountPath = mountPath;
- }
-
- public long getCapacity() {
- return capacity;
- }
-
- public void setCapacity(long capacity) {
- this.capacity = capacity;
- }
-
- public long getUsed() {
- return used;
- }
-
- public void setUsed(long used) {
- this.used = used;
- }
-
- public int getDeviceId() {
- return deviceId;
- }
-
- @Override
- public boolean equals(Object obj){
- if (!(obj instanceof DiskMountInfo)) return false;
-
- if (compareTo((DiskMountInfo) obj) == 0) return true;
- else return false;
- }
-
- @Override
- public int hashCode(){
- return Objects.hashCode(mountPath);
- }
-
- @Override
- public int compareTo(DiskMountInfo other) {
- String path1 = mountPath;
- String path2 = other.mountPath;
-
- int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
- int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
-
- if(path1Depth > path2Depth) {
- return -1;
- } else if(path1Depth < path2Depth) {
- return 1;
- } else {
- int path1Length = path1.length();
- int path2Length = path2.length();
-
- if(path1Length < path2Length) {
- return 1;
- } else if(path1Length > path2Length) {
- return -1;
- } else {
- return path1.compareTo(path2);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
deleted file mode 100644
index 66827c2..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Util;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-
-public class DiskUtil {
-
- static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
- public enum OSType {
- OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
- }
-
- static private OSType getOSType() {
- String osName = System.getProperty("os.name");
- if (osName.contains("Windows")
- && (osName.contains("XP") || osName.contains("2003")
- || osName.contains("Vista")
- || osName.contains("Windows_7")
- || osName.contains("Windows 7") || osName
- .contains("Windows7"))) {
- return OSType.OS_TYPE_WINXP;
- } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
- return OSType.OS_TYPE_SOLARIS;
- } else if (osName.contains("Mac")) {
- return OSType.OS_TYPE_MAC;
- } else {
- return OSType.OS_TYPE_UNIX;
- }
- }
-
- public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
- List<DiskDeviceInfo> deviceInfos;
-
- if(getOSType() == OSType.OS_TYPE_UNIX) {
- deviceInfos = getUnixDiskDeviceInfos();
- setDeviceMountInfo(deviceInfos);
- } else {
- deviceInfos = getDefaultDiskDeviceInfos();
- }
-
- return deviceInfos;
- }
-
- private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- File file = new File(UNIX_DISK_DEVICE_PATH);
- if(!file.exists()) {
- System.out.println("No partition file:" + file.getAbsolutePath());
- return getDefaultDiskDeviceInfos();
- }
-
- BufferedReader reader = null;
- try {
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
- String line = null;
-
- int count = 0;
- Set<String> deviceNames = new TreeSet<String>();
- while((line = reader.readLine()) != null) {
- if(count > 0 && !line.trim().isEmpty()) {
- String[] tokens = line.trim().split(" +");
- if(tokens.length == 4) {
- String deviceName = getDiskDeviceName(tokens[3]);
- deviceNames.add(deviceName);
- }
- }
- count++;
- }
-
- int id = 0;
- for(String eachDeviceName: deviceNames) {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
- diskDeviceInfo.setName(eachDeviceName);
-
- //TODO set addtional info
- // /sys/block/sda/queue
- infos.add(diskDeviceInfo);
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- if(reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- }
- }
- }
-
- return infos;
- }
-
- private static String getDiskDeviceName(String partitionName) {
- byte[] bytes = partitionName.getBytes();
-
- byte[] result = new byte[bytes.length];
- int length = 0;
- for(int i = 0; i < bytes.length; i++, length++) {
- if(bytes[i] >= '0' && bytes[i] <= '9') {
- break;
- } else {
- result[i] = bytes[i];
- }
- }
-
- return new String(result, 0, length);
- }
-
- public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
- DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
- diskDeviceInfo.setName("default");
-
- List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-
- infos.add(diskDeviceInfo);
-
- return infos;
- }
-
-
- private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
- Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
- for(DiskDeviceInfo eachDevice: deviceInfos) {
- deviceMap.put(eachDevice.getName(), eachDevice);
- }
-
- BufferedReader mountOutput = null;
- try {
- Process mountProcess = Runtime.getRuntime().exec("mount");
- mountOutput = new BufferedReader(new InputStreamReader(
- mountProcess.getInputStream()));
- while (true) {
- String line = mountOutput.readLine();
- if (line == null) {
- break;
- }
-
- int indexStart = line.indexOf(" on /");
- int indexEnd = line.indexOf(" ", indexStart + 4);
-
- String deviceName = line.substring(0, indexStart).trim();
- String[] deviceNameTokens = deviceName.split("/");
- if(deviceNameTokens.length == 3) {
- if("dev".equals(deviceNameTokens[1])) {
- String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
- String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-
- DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
- if(diskDeviceInfo != null) {
- diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
- }
- }
- }
- }
- } catch (IOException e) {
- throw e;
- } finally {
- if (mountOutput != null) {
- mountOutput.close();
- }
- }
- }
-
- public static int getDataNodeStorageSize(){
- return getStorageDirs().size();
- }
-
- public static List<URI> getStorageDirs(){
- Configuration conf = new HdfsConfiguration();
- Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
- return Util.stringCollectionAsURIs(dirNames);
- }
-
- public static void main(String[] args) throws Exception {
- System.out.println("/dev/sde1".split("/").length);
- for(String eachToken: "/dev/sde1".split("/")) {
- System.out.println(eachToken);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
deleted file mode 100644
index 07fbe6c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileScanRunner extends Thread {
- private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
-
- StorageManagerV2.StorgaeManagerContext smContext;
- FileScannerV2 fileScanner;
- Object requestQueueMonitor;
- AtomicInteger numOfRunningScanners;
- DiskFileScanScheduler diskFileScanScheduler;
-
- int maxReadBytes;
-
- public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler,
- StorageManagerV2.StorgaeManagerContext smContext,
- FileScannerV2 fileScanner, Object requestQueueMonitor,
- AtomicInteger numOfRunningScanners) {
- super("FileScanRunner:" + fileScanner.getId());
- this.diskFileScanScheduler = diskFileScanScheduler;
- this.fileScanner = fileScanner;
- this.smContext = smContext;
- this.requestQueueMonitor = requestQueueMonitor;
- this.numOfRunningScanners = numOfRunningScanners;
-
- this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
- }
-
- public void run() {
- try {
-// long startTime = System.currentTimeMillis();
-// boolean fetching = fileScanner.isFetchProcessing();
- fileScanner.scan(maxReadBytes);
-// if(diskFileScanScheduler.getDiskId() == 1) {
-// LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
-// ",fetching=" + fetching +
-// ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
-// }
- } catch (Exception e) {
- LOG.error(e.getMessage(), e);
- } finally {
- synchronized(requestQueueMonitor) {
- numOfRunningScanners.decrementAndGet();
- requestQueueMonitor.notifyAll();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
deleted file mode 100644
index da7084c..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.FileFragment;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public abstract class FileScannerV2 implements Scanner {
- private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
-
- protected AtomicBoolean closed = new AtomicBoolean(false);
-
- protected FileSystem fs;
-
- protected boolean inited = false;
- protected final Configuration conf;
- protected final TableMeta meta;
- protected final Schema schema;
- protected final FileFragment fragment;
- protected final int columnNum;
- protected Column[] targets;
- protected long totalScanTime = 0;
- protected int allocatedDiskId;
-
- protected StorageManagerV2.StorgaeManagerContext smContext;
-
- protected AtomicBoolean firstSchdeuled = new AtomicBoolean(true);
-
- protected float progress;
-
- protected TableStats tableStats;
-
- protected abstract boolean scanNext(int length) throws IOException;
-
- protected abstract boolean initFirstScan(int maxBytesPerSchedule) throws IOException;
-
- protected abstract long getFilePosition() throws IOException;
-
- protected abstract Tuple nextTuple() throws IOException;
-
- public abstract boolean isFetchProcessing();
-
- public abstract boolean isStopScanScheduling();
-
- public abstract void scannerReset();
-
- protected abstract long[] reportReadBytes();
-
- public FileScannerV2(final Configuration conf,
- final TableMeta meta,
- final Schema schema,
- final FileFragment fragment) throws IOException {
- this.conf = conf;
- this.meta = meta;
- this.schema = schema;
- this.fragment = fragment;
- this.columnNum = this.schema.size();
-
- this.fs = fragment.getPath().getFileSystem(conf);
- }
-
- public void init() throws IOException {
- closed.set(false);
- firstSchdeuled.set(true);
-
- if(!inited) {
- smContext.requestFileScan(this);
- }
- inited = true;
- progress = 0.0f;
-
- tableStats = new TableStats();
- if (fragment != null) {
- tableStats.setNumBytes(fragment.getEndKey());
- tableStats.setNumBlocks(1);
- }
-
- if (schema != null) {
- for(Column eachColumn: schema.getColumns()) {
- ColumnStats columnStats = new ColumnStats(eachColumn);
- tableStats.addColumnStat(columnStats);
- }
- }
- }
-
- @Override
- public void reset() throws IOException {
- scannerReset();
- close();
- inited = false;
- init();
- }
-
- public void setAllocatedDiskId(int allocatedDiskId) {
- this.allocatedDiskId = allocatedDiskId;
- }
-
- public String getId() {
- return fragment.getPath().getName() + ":" + fragment.getStartKey() + ":" +
- fragment.getEndKey() + "_" + System.currentTimeMillis();
- }
-
- @Override
- public Schema getSchema() {
- return schema;
- }
-
- @Override
- public void setTarget(Column[] targets) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- this.targets = targets;
- }
-
- public Path getPath() {
- return fragment.getPath();
- }
-
- public int getDiskId() {
- if(fragment.getDiskIds().length <= 0) {
- //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartKey());
- return -1;
- } else {
- return fragment.getDiskIds()[0];
- }
- }
-
- public void setSearchCondition(Object expr) {
- if (inited) {
- throw new IllegalStateException("Should be called before init()");
- }
- }
-
- public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
- this.smContext = context;
- }
-
- public String toString() {
- return fragment.getPath() + ":" + fragment.getStartKey();
- }
-
- public void scan(int maxBytesPerSchedule) throws IOException {
- long startTime = System.currentTimeMillis();
- try {
- synchronized(firstSchdeuled) {
- if(firstSchdeuled.get()) {
- boolean moreData = initFirstScan(maxBytesPerSchedule);
- firstSchdeuled.set(false);
- firstSchdeuled.notifyAll();
- if(moreData) {
- smContext.requestFileScan(this);
- }
- return;
- }
- }
- boolean moreData = scanNext(maxBytesPerSchedule);
-
- if(moreData) {
- smContext.requestFileScan(this);
- }
- } finally {
- totalScanTime += System.currentTimeMillis() - startTime;
- }
- }
-
- @Override
- public void close() throws IOException {
- if(closed.get()) {
- return;
- }
- long[] readBytes = reportReadBytes();
- smContext.incrementReadBytes(allocatedDiskId, readBytes);
- closed.set(true);
- progress = 1.0f;
- LOG.info(toString() + " closed, totalScanTime=" + totalScanTime);
- }
-
- public boolean isClosed() {
- return closed.get();
- }
-
- public Tuple next() throws IOException {
- synchronized(firstSchdeuled) {
- if(firstSchdeuled.get()) {
- try {
- firstSchdeuled.wait();
- } catch (InterruptedException e) {
- }
- }
- }
- return nextTuple();
- }
-
- @Override
- public float getProgress() {
- return progress;
- }
-
- @Override
- public TableStats getInputStats() {
- return tableStats;
- }
-}