You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2013/09/13 05:29:57 UTC
[2/4] TAJO-178: Implements StorageManager for scanning
asynchronously. (hyoungjunkim via hyunsik)
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 79481fa..9907591 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,148 +18,27 @@
package org.apache.tajo.storage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.TableMetaImpl;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
-import java.io.FileNotFoundException;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
/**
* StorageManager
*/
-public class StorageManager {
- private final Log LOG = LogFactory.getLog(StorageManager.class);
+public class StorageManager extends AbstractStorageManager {
- private final TajoConf conf;
- private final FileSystem fs;
- private final Path baseDir;
- private final Path tableBaseDir;
- private final boolean blocksMetadataEnabled;
-
- /**
- * Cache of scanner handlers for each storage type.
- */
- private static final Map<String, Class<? extends FileScanner>> SCANNER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends FileScanner>>();
-
- /**
- * Cache of appender handlers for each storage type.
- */
- private static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
- = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
-
- /**
- * Cache of constructors for each class. Pins the classes so they
- * can't be garbage collected until ReflectionUtils can be collected.
- */
- private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
- new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
-
- public StorageManager(TajoConf conf) throws IOException {
- this.conf = conf;
- this.baseDir = new Path(conf.getVar(ConfVars.ROOT_DIR));
- this.tableBaseDir = new Path(this.baseDir, TajoConstants.WAREHOUSE_DIR_NAME);
- this.fs = baseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled) {
- LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
- }
- }
-
- public static StorageManager get(TajoConf conf) throws IOException {
- return new StorageManager(conf);
- }
-
- public static StorageManager get(TajoConf conf, String dataRoot)
- throws IOException {
- conf.setVar(ConfVars.ROOT_DIR, dataRoot);
- return new StorageManager(conf);
- }
-
- public static StorageManager get(TajoConf conf, Path dataRoot)
- throws IOException {
- conf.setVar(ConfVars.ROOT_DIR, dataRoot.toString());
- return new StorageManager(conf);
- }
-
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
- public Path getBaseDir() {
- return this.baseDir;
- }
-
- public Path getTableBaseDir() {
- return this.tableBaseDir;
- }
-
- public void delete(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- fs.delete(tablePath, true);
+ protected StorageManager(TajoConf conf) throws IOException {
+ super(conf);
}
- public boolean exists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- return fileSystem.exists(path);
- }
-
- /**
- * This method deletes only data contained in the given path.
- *
- * @param path The path in which data are deleted.
- * @throws IOException
- */
- public void deleteData(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- FileStatus[] fileLists = fileSystem.listStatus(path);
- for (FileStatus status : fileLists) {
- fileSystem.delete(status.getPath(), true);
- }
- }
-
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
- }
-
- public static Scanner getScanner(Configuration conf, TableMeta meta, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
- return getScanner(conf, meta, fragment);
- }
-
- public static Scanner getScanner(Configuration conf, TableMeta meta, Fragment fragment)
- throws IOException {
- return getScanner(conf, meta, fragment, meta.getSchema());
- }
-
- public static Scanner getScanner(Configuration conf, TableMeta meta, Fragment fragment,
- Schema target)
- throws IOException {
+ @Override
+ public Scanner getScanner(TableMeta meta, Fragment fragment,
+ Schema target) throws IOException {
Scanner scanner;
- Class<? extends FileScanner> scannerClass;
+ Class<? extends Scanner> scannerClass;
String handlerName = meta.getStoreType().name().toLowerCase();
scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
@@ -167,7 +46,7 @@ public class StorageManager {
scannerClass = conf.getClass(
String.format("tajo.storage.scanner-handler.%s.class",
meta.getStoreType().name().toLowerCase()), null,
- FileScanner.class);
+ Scanner.class);
SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
}
@@ -182,525 +61,4 @@ public class StorageManager {
return scanner;
}
-
- public static Appender getAppender(Configuration conf, TableMeta meta, Path path)
- throws IOException {
- Appender appender;
-
- Class<? extends FileAppender> appenderClass;
-
- String handlerName = meta.getStoreType().name().toLowerCase();
- appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
- if (appenderClass == null) {
- appenderClass = conf.getClass(
- String.format("tajo.storage.appender-handler.%s.class",
- meta.getStoreType().name().toLowerCase()), null,
- FileAppender.class);
- APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
- }
-
- if (appenderClass == null) {
- throw new IOException("Unknown Storage Type: " + meta.getStoreType());
- }
-
- appender = newAppenderInstance(appenderClass, conf, meta, path);
-
- return appender;
- }
-
-
- public TableMeta getTableMeta(Path tablePath) throws IOException {
- TableMeta meta;
-
- FileSystem fs = tablePath.getFileSystem(conf);
- Path tableMetaPath = new Path(tablePath, ".meta");
- if (!fs.exists(tableMetaPath)) {
- throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
- }
-
- FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
- TableProto tableProto = (TableProto) FileUtil.loadProto(tableMetaIn,
- TableProto.getDefaultInstance());
- meta = new TableMetaImpl(tableProto);
-
- return meta;
- }
-
- public Fragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- public Fragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fragmentSize);
- }
-
- public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- TableMeta meta = getTableMeta(tablePath);
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
- listTablets.add(tablet);
- }
-
- Fragment[] tablets = new Fragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public Fragment[] split(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
- }
-
- public Fragment[] split(String tableName, Path tablePath) throws IOException {
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- private Fragment[] split(String tableName, Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- TableMeta meta = getTableMeta(tablePath);
- long defaultBlockSize = size;
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
- } else {
- listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
- }
- }
-
- Fragment[] tablets = new Fragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
- Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<Fragment> listTablets = new ArrayList<Fragment>();
- Fragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
- } else {
- listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
- }
- }
-
- Fragment[] tablets = new Fragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public long calculateSize(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- long totalSize = 0;
-
- if (fs.exists(tablePath)) {
- for (FileStatus status : fs.listStatus(tablePath)) {
- totalSize += status.getLen();
- }
- }
-
- return totalSize;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // FileInputFormat Area
- /////////////////////////////////////////////////////////////////////////////
-
- private static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
-
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(Path path) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- Path[] dirs = new Path[]{path};
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
-
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i = 0; i < dirs.length; ++i) {
- Path p = dirs[i];
-
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat : matches) {
- if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Get the lower bound on split size imposed by the format.
- *
- * @return the number of bytes of the minimal split for this format
- */
- protected long getFormatMinSplitSize() {
- return 1;
- }
-
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- * <p/>
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that Mappers process entire files.
- *
- * @param filename the file name to check
- * @return is this file isSplittable?
- */
- protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
- Scanner scanner = getScanner(conf, meta, filename);
- return scanner.isSplittable();
- }
-
- @Deprecated
- protected long computeSplitSize(long blockSize, long minSize,
- long maxSize) {
- return Math.max(minSize, Math.min(maxSize, blockSize));
- }
-
- @Deprecated
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- @Deprecated
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
- /**
- * A factory that makes the split for this class. It can be overridden
- * by sub-classes to make sub-types
- */
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
- return new Fragment(fragmentId, file, meta, start, length);
- }
-
- protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
- int[] diskIds) throws IOException {
- return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
- }
-
- // for Non Splittable. eg, compressed gzip TextFile
- protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
- BlockLocation[] blkLocations) throws IOException {
-
- Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
- for (BlockLocation blockLocation : blkLocations) {
- for (String host : blockLocation.getHosts()) {
- if (hostsBlockMap.containsKey(host)) {
- hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
- } else {
- hostsBlockMap.put(host, 1);
- }
- }
- }
-
- List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
- @Override
- public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
- return v1.getValue().compareTo(v2.getValue());
- }
- });
-
- String[] hosts = new String[blkLocations[0].getHosts().length];
- int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
-
- for (int i = 0; i < hosts.length; i++) {
- Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
- hosts[i] = entry.getKey();
- hostsBlockCount[i] = entry.getValue();
- }
- return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
- }
-
- /**
- * Get the maximum split size.
- *
- * @return the maximum number of bytes a split can include
- */
- @Deprecated
- public static long getMaxSplitSize() {
- // TODO - to be configurable
- return 536870912L;
- }
-
- /**
- * Get the minimum split size
- *
- * @return the minimum number of bytes that can be in a split
- */
- @Deprecated
- public static long getMinSplitSize() {
- // TODO - to be configurable
- return 67108864L;
- }
-
- /**
- * Get Disk Ids by Volume Bytes
- */
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; i++) {
- int diskId = -1;
- if (volumeIds[i] != null && volumeIds[i].isValid()) {
- String volumeIdString = volumeIds[i].toString();
- byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
-
- if (volumeIdBytes.length == 4) {
- diskId = Bytes.toInt(volumeIdBytes);
- } else if (volumeIdBytes.length == 1) {
- diskId = (int) volumeIdBytes[0]; // support hadoop-2.0.2
- }
- }
- diskIds[i] = diskId;
- }
- return diskIds;
- }
-
- /**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (Fragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
- * Generate the list of files and make them into FileSplits.
- *
- * @throws IOException
- */
- public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
- // generate splits'
-
- List<Fragment> splits = new ArrayList<Fragment>();
- List<FileStatus> files = listStatus(inputPath);
- FileSystem fs = inputPath.getFileSystem(conf);
- for (FileStatus file : files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length > 0) {
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- boolean splittable = isSplittable(meta, path);
- if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
- // supported disk volume
- BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
- .getFileBlockStorageLocations(Arrays.asList(blkLocations));
- if (splittable) {
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
- .getVolumeIds())));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
- }
-
- } else {
- if (splittable) {
- for (BlockLocation blockLocation : blkLocations) {
- splits.add(makeSplit(tableName, meta, path, blockLocation, null));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
- }
- }
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, meta, path, 0, length));
- }
- }
-
- LOG.info("Total # of splits: " + splits.size());
- return splits;
- }
-
- private class InvalidInputException extends IOException {
- public InvalidInputException(
- List<IOException> errors) {
- }
- }
-
- private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
- Configuration.class,
- TableMeta.class,
- Fragment.class
- };
-
- private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
- Configuration.class,
- TableMeta.class,
- Path.class
- };
-
- /**
- * create a scanner instance.
- */
- public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta,
- Fragment fragment) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, meta, fragment});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
-
- /**
- * create a scanner instance.
- */
- public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta,
- Path path) {
- T result;
- try {
- Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
- if (meth == null) {
- meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
- meth.setAccessible(true);
- CONSTRUCTOR_CACHE.put(theClass, meth);
- }
- result = meth.newInstance(new Object[]{conf, meta, path});
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
new file mode 100644
index 0000000..8b7c2ca
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StorageManagerFactory {
+ private static Map<String, AbstractStorageManager> storageManagers =
+ new HashMap<String, AbstractStorageManager>();
+
+ public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException {
+ return getStorageManager(conf, null);
+ }
+
+ public static synchronized AbstractStorageManager getStorageManager (
+ TajoConf conf, Path dataRoot) throws IOException {
+ return getStorageManager(conf, dataRoot, conf.getBoolean("tajo.storage.manager.v2", false));
+ }
+
+ private static synchronized AbstractStorageManager getStorageManager (
+ TajoConf conf, Path dataRoot, boolean v2) throws IOException {
+ if(dataRoot != null) {
+ conf.setVar(TajoConf.ConfVars.ROOT_DIR, dataRoot.toString());
+ }
+
+ URI uri;
+ if(dataRoot == null) {
+ uri = FileSystem.get(conf).getUri();
+ } else {
+ uri = dataRoot.toUri();
+ }
+ String key = "file".equals(uri.getScheme()) ? "file" : uri.getScheme() + uri.getHost() + uri.getPort();
+
+ if(v2) {
+ key += "_v2";
+ }
+
+ if(storageManagers.containsKey(key)) {
+ return storageManagers.get(key);
+ } else {
+ AbstractStorageManager storageManager = null;
+
+ if(v2) {
+ storageManager = new StorageManagerV2(conf);
+ } else {
+ storageManager = new StorageManager(conf);
+ }
+
+ storageManagers.put(key, storageManager);
+
+ return storageManager;
+ }
+ }
+
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Fragment fragment, Schema schema) throws IOException {
+ return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, fragment, schema);
+ }
+
+ public static synchronized SeekableScanner getSeekableScanner(
+ TajoConf conf, TableMeta meta, Path path) throws IOException {
+
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
+
+ return getSeekableScanner(conf, meta, fragment, fragment.getSchema());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
new file mode 100644
index 0000000..f34fa84
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.SeekableScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.json.StorageGsonHelper;
+
+public class CSVFileScanner extends FileScannerV2 {
+ public static final String DELIMITER = "csvfile.delimiter";
+ public static final String DELIMITER_DEFAULT = "|";
+ public static final byte LF = '\n';
+ private static final Log LOG = LogFactory.getLog(CSVFileScanner.class);
+
+ private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
+ private int bufSize;
+ private char delimiter;
+ private FSDataInputStream fis;
+ private InputStream is; //decompressd stream
+ private CompressionCodecFactory factory;
+ private CompressionCodec codec;
+ private Decompressor decompressor;
+ private Seekable filePosition;
+ private boolean splittable = true;
+ private long startOffset, length;
+ private byte[] buf = null;
+ private String[] tuples = null;
+ private long[] tupleOffsets = null;
+ private int currentIdx = 0, validIdx = 0;
+ private byte[] tail = null;
+ private long pageStart = -1;
+ private long prevTailLen = -1;
+ private int[] targetColumnIndexes;
+ private boolean eof = false;
+
+ public CSVFileScanner(Configuration conf, final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ super(conf, meta, fragment);
+ factory = new CompressionCodecFactory(conf);
+ codec = factory.getCodec(fragment.getPath());
+ if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
+ splittable = false;
+ }
+ }
+
+ @Override
+ public void init() throws IOException {
+ // Buffer size, Delimiter
+ this.bufSize = DEFAULT_BUFFER_SIZE;
+ String delim = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
+ this.delimiter = delim.charAt(0);
+
+ super.init();
+ }
+
+ @Override
+ protected void initFirstScan() throws IOException {
+ if(!firstSchdeuled) {
+ return;
+ }
+ firstSchdeuled = false;
+
+ // Fragment information
+ fis = fs.open(fragment.getPath(), 128 * 1024);
+ startOffset = fragment.getStartOffset();
+ length = fragment.getLength();
+
+ if (startOffset > 0) {
+ startOffset--; // prev line feed
+ }
+
+ if (codec != null) {
+ decompressor = CodecPool.getDecompressor(codec);
+ if (codec instanceof SplittableCompressionCodec) {
+ SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+ fis, decompressor, startOffset, startOffset + length,
+ SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+ startOffset = cIn.getAdjustedStart();
+ length = cIn.getAdjustedEnd() - startOffset;
+ filePosition = cIn;
+ is = cIn;
+ } else {
+ is = new DataInputStream(codec.createInputStream(fis, decompressor));
+ }
+ } else {
+ fis.seek(startOffset);
+ filePosition = fis;
+ is = fis;
+ }
+
+ tuples = new String[0];
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ targetColumnIndexes = new int[targets.length];
+ for (int i = 0; i < targets.length; i++) {
+ targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
+ "," + fs.getFileStatus(fragment.getPath()).getLen());
+ }
+
+ if (startOffset != 0) {
+ int rbyte;
+ while ((rbyte = is.read()) != LF) {
+ if(rbyte == -1) break;
+ }
+ }
+
+ if (fragmentable() < 1) {
+ close();
+ return;
+ }
+ page();
+ }
+
+ private long fragmentable() throws IOException {
+ return startOffset + length - getFilePosition();
+ }
+
+ @Override
+ protected long getFilePosition() throws IOException {
+ long retVal;
+ if (filePosition != null) {
+ retVal = filePosition.getPos();
+ } else {
+ retVal = fis.getPos();
+ }
+ return retVal;
+ }
+
+ private void page() throws IOException {
+ // Index initialization
+ currentIdx = 0;
+
+ // Buffer size set
+ if (isSplittable() && fragmentable() < DEFAULT_BUFFER_SIZE) {
+ bufSize = (int) fragmentable();
+ }
+
+ if (this.tail == null || this.tail.length == 0) {
+ this.pageStart = getFilePosition();
+ this.prevTailLen = 0;
+ } else {
+ this.pageStart = getFilePosition() - this.tail.length;
+ this.prevTailLen = this.tail.length;
+ }
+
+ // Read
+ int rbyte;
+ buf = new byte[bufSize];
+ rbyte = is.read(buf);
+
+ if (rbyte < 0) {
+ eof = true; // EOF
+ return;
+ }
+
+ if (prevTailLen == 0) {
+ tail = new byte[0];
+ tuples = StringUtils.split(new String(buf, 0, rbyte), (char) LF);
+ } else {
+ tuples = StringUtils.split(new String(tail)
+ + new String(buf, 0, rbyte), (char) LF);
+ tail = null;
+ }
+
+ // Check tail
+ if ((char) buf[rbyte - 1] != LF) {
+ if ((fragmentable() < 1 || rbyte != bufSize)) {
+ int cnt = 0;
+ byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
+ // Read bytes
+ while ((temp[cnt] = (byte) is.read()) != LF) {
+ cnt++;
+ }
+
+ // Replace tuple
+ tuples[tuples.length - 1] = tuples[tuples.length - 1]
+ + new String(temp, 0, cnt);
+ validIdx = tuples.length;
+ } else {
+ tail = tuples[tuples.length - 1].getBytes();
+ validIdx = tuples.length - 1;
+ }
+ } else {
+ tail = new byte[0];
+ validIdx = tuples.length;
+ }
+
+ if(!isCompress()) makeTupleOffset();
+ }
+
+ private void makeTupleOffset() {
+ long curTupleOffset = 0;
+ this.tupleOffsets = new long[this.validIdx];
+ for (int i = 0; i < this.validIdx; i++) {
+ this.tupleOffsets[i] = curTupleOffset + this.pageStart;
+ curTupleOffset += this.tuples[i].getBytes().length + 1;// tuple byte
+ // + 1byte
+ // line feed
+ }
+ }
+
+ protected Tuple getNextTuple() throws IOException {
+ try {
+ if (currentIdx == validIdx) {
+ if (isSplittable() && fragmentable() < 1) {
+ close();
+ return null;
+ } else {
+ page();
+ }
+
+ if(eof){
+ close();
+ return null;
+ }
+ }
+
+ long offset = -1;
+ if(!isCompress()){
+ offset = this.tupleOffsets[currentIdx];
+ }
+
+ String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
+
+ int targetLen = targets.length;
+
+ VTuple tuple = new VTuple(columnNum);
+ Column field;
+ tuple.setOffset(offset);
+ for (int i = 0; i < targetLen; i++) {
+ field = targets[i];
+ int tid = targetColumnIndexes[i];
+ if (cells.length <= tid) {
+ tuple.put(tid, DatumFactory.createNullDatum());
+ } else {
+ String cell = cells[tid].trim();
+
+ if (cell.equals("")) {
+ tuple.put(tid, DatumFactory.createNullDatum());
+ } else {
+ switch (field.getDataType().getType()) {
+ case BOOLEAN:
+ tuple.put(tid, DatumFactory.createBool(cell));
+ break;
+ case BIT:
+ tuple.put(tid, DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
+ break;
+ case CHAR:
+ String trimmed = cell.trim();
+ tuple.put(tid, DatumFactory.createChar(trimmed));
+ break;
+ case BLOB:
+ tuple.put(tid, DatumFactory.createBlob(Base64.decodeBase64(cell)));
+ break;
+ case INT2:
+ tuple.put(tid, DatumFactory.createInt2(cell));
+ break;
+ case INT4:
+ tuple.put(tid, DatumFactory.createInt4(cell));
+ break;
+ case INT8:
+ tuple.put(tid, DatumFactory.createInt8(cell));
+ break;
+ case FLOAT4:
+ tuple.put(tid, DatumFactory.createFloat4(cell));
+ break;
+ case FLOAT8:
+ tuple.put(tid, DatumFactory.createFloat8(cell));
+ break;
+ case TEXT:
+ tuple.put(tid, DatumFactory.createText(cell));
+ break;
+ case INET4:
+ tuple.put(tid, DatumFactory.createInet4(cell));
+ break;
+ case ARRAY:
+ Datum data = StorageGsonHelper.getInstance().fromJson(cell,
+ Datum.class);
+ tuple.put(tid, data);
+ break;
+ }
+ }
+ }
+ }
+ return tuple;
+ } catch (Throwable t) {
+ LOG.error(t.getMessage(), t);
+ }
+ return null;
+ }
+
+ private boolean isCompress() {
+ return codec != null;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ super.reset();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(closed.get()) {
+ return;
+ }
+ try {
+ is.close();
+ } finally {
+ if (decompressor != null) {
+ CodecPool.returnDecompressor(decompressor);
+ decompressor = null;
+ }
+ tuples = null;
+ super.close();
+ }
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public void setSearchCondition(Object expr) {
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return splittable;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
new file mode 100644
index 0000000..7802c91
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskDeviceInfo {
+ private int id;
+ private String name;
+
+ private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
+
+ public DiskDeviceInfo(int id) {
+ this.id = id;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return id + "," + name;
+ }
+
+ public void addMountPath(DiskMountInfo diskMountInfo) {
+ mountInfos.add(diskMountInfo);
+ }
+
+ public List<DiskMountInfo> getMountInfos() {
+ return mountInfos;
+ }
+
+ public void setMountInfos(List<DiskMountInfo> mountInfos) {
+ this.mountInfos = mountInfos;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
new file mode 100644
index 0000000..d55a6db
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DiskFileScanScheduler extends Thread {
+ private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
+
+ private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
+
+ Queue<FileScannerV2> fetchingScanners = new LinkedList<FileScannerV2>();
+
+ private int scanConcurrency;
+
+ private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
+
+ private Object requestQueueMonitor = new Object(); // c++ code style
+
+ private StorageManagerV2.StorgaeManagerContext smContext;
+
+ private DiskDeviceInfo diskDeviceInfo;
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private long totalScanCount = 0;
+
+ private FetchWaitingThread fetchWaitingThread;
+
+ public DiskFileScanScheduler(
+ StorageManagerV2.StorgaeManagerContext smContext,
+ DiskDeviceInfo diskDeviceInfo) {
+ super("DiskFileScanner:" + diskDeviceInfo);
+ this.smContext = smContext;
+ this.diskDeviceInfo = diskDeviceInfo;
+ initScannerPool();
+ this.fetchWaitingThread = new FetchWaitingThread();
+ this.fetchWaitingThread.start();
+ }
+
+ public int getDiskId() {
+ return diskDeviceInfo.getId();
+ }
+
+ public void run() {
+ synchronized (requestQueueMonitor) {
+ while(!stopped.get()) {
+ if(isAllScannerRunning()) {
+ try {
+ requestQueueMonitor.wait(2000);
+ continue;
+ } catch (InterruptedException e) {
+ break;
+ }
+ } else {
+ FileScannerV2 fileScanner = requestQueue.poll();
+ if(fileScanner == null) {
+ try {
+ requestQueueMonitor.wait(2000);
+ continue;
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ if(fileScanner.isFetchProcessing()) {
+ fetchingScanners.add(fileScanner);
+ synchronized(fetchingScanners) {
+ fetchingScanners.notifyAll();
+ }
+ } else {
+ numOfRunningScanners.incrementAndGet();
+ FileScanRunner fileScanRunner = new FileScanRunner(
+ DiskFileScanScheduler.this, smContext,
+ fileScanner, requestQueueMonitor,
+ numOfRunningScanners);
+ totalScanCount++;
+ fileScanRunner.start();
+ }
+ }
+ }
+ }
+ }
+
+ protected void requestScanFile(FileScannerV2 fileScannerV2) {
+ synchronized (requestQueueMonitor) {
+ requestQueue.offer(fileScannerV2);
+ requestQueueMonitor.notifyAll();
+ }
+ }
+
+ public class FetchWaitingThread extends Thread {
+ public void run() {
+ while(!stopped.get()) {
+ FileScannerV2 scanner = null;
+ synchronized(fetchingScanners) {
+ scanner = fetchingScanners.poll();
+ if(scanner == null) {
+ try {
+ fetchingScanners.wait();
+ continue;
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ break;
+ }
+ synchronized(requestQueueMonitor) {
+ requestQueue.offer(scanner);
+ requestQueueMonitor.notifyAll();
+ }
+ }
+ }
+ }
+
+ private void initScannerPool() {
+ // TODO finally implements heuristic, currently set with property
+ scanConcurrency = smContext.getConf().getInt("tajo.storage.manager.concurrency.perDisk", 1);
+ }
+
+ public int getTotalQueueSize() {
+ return requestQueue.size();
+ }
+
+ boolean isAllScannerRunning() {
+ return numOfRunningScanners.get() >= scanConcurrency;
+ }
+
+ public long getTotalScanCount() {
+ return totalScanCount;
+ }
+
+ public void stopScan() {
+ stopped.set(true);
+ if (fetchWaitingThread != null) {
+ fetchWaitingThread.interrupt();
+ }
+
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
new file mode 100644
index 0000000..d71154c
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+public class DiskInfo {
+ private int id;
+ private String partitionName;
+ private String mountPath;
+
+ private long capacity;
+ private long used;
+
+ public DiskInfo(int id, String partitionName) {
+ this.id = id;
+ this.partitionName = partitionName;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ public String getPartitionName() {
+ return partitionName;
+ }
+
+ public void setPartitionName(String partitionName) {
+ this.partitionName = partitionName;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(long capacity) {
+ this.capacity = capacity;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public void setUsed(long used) {
+ this.used = used;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
new file mode 100644
index 0000000..d9b0dd2
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+public class DiskMountInfo implements Comparable<DiskMountInfo> {
+ private String mountPath;
+
+ private long capacity;
+ private long used;
+
+ private int deviceId;
+
+ public DiskMountInfo(int deviceId, String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public String getMountPath() {
+ return mountPath;
+ }
+
+ public void setMountPath(String mountPath) {
+ this.mountPath = mountPath;
+ }
+
+ public long getCapacity() {
+ return capacity;
+ }
+
+ public void setCapacity(long capacity) {
+ this.capacity = capacity;
+ }
+
+ public long getUsed() {
+ return used;
+ }
+
+ public void setUsed(long used) {
+ this.used = used;
+ }
+
+ public int getDeviceId() {
+ return deviceId;
+ }
+
+ @Override
+ public int compareTo(DiskMountInfo other) {
+ String path1 = mountPath;
+ String path2 = other.mountPath;
+
+ int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
+ int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
+
+ if(path1Depth > path2Depth) {
+ return -1;
+ } else if(path1Depth < path2Depth) {
+ return 1;
+ } else {
+ int path1Length = path1.length();
+ int path2Length = path2.length();
+
+ if(path1Length < path2Length) {
+ return 1;
+ } else if(path1Length > path1Length) {
+ return -1;
+ } else {
+ return path1.compareTo(path2);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
new file mode 100644
index 0000000..2daf0f5
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class DiskUtil {
+
+ public enum OSType {
+ OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
+ }
+
+ static private OSType getOSType() {
+ String osName = System.getProperty("os.name");
+ if (osName.contains("Windows")
+ && (osName.contains("XP") || osName.contains("2003")
+ || osName.contains("Vista")
+ || osName.contains("Windows_7")
+ || osName.contains("Windows 7") || osName
+ .contains("Windows7"))) {
+ return OSType.OS_TYPE_WINXP;
+ } else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+ return OSType.OS_TYPE_SOLARIS;
+ } else if (osName.contains("Mac")) {
+ return OSType.OS_TYPE_MAC;
+ } else {
+ return OSType.OS_TYPE_UNIX;
+ }
+ }
+
+ public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
+ List<DiskDeviceInfo> deviceInfos;
+
+ if(getOSType() == OSType.OS_TYPE_UNIX) {
+ deviceInfos = getUnixDiskDeviceInfos();
+ setDeviceMountInfo(deviceInfos);
+ } else {
+ deviceInfos = getDefaultDiskDeviceInfos();
+ }
+
+ return deviceInfos;
+ }
+
+ private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
+ List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+
+ File file = new File("/proc/partitions");
+ if(!file.exists()) {
+ System.out.println("No partition file:" + file.getAbsolutePath());
+ return getDefaultDiskDeviceInfos();
+ }
+
+ BufferedReader reader = null;
+ try {
+ reader = new BufferedReader(new InputStreamReader(new FileInputStream("/proc/partitions")));
+ String line = null;
+
+ int count = 0;
+ Set<String> deviceNames = new TreeSet<String>();
+ while((line = reader.readLine()) != null) {
+ if(count > 0 && !line.trim().isEmpty()) {
+ String[] tokens = line.trim().split(" +");
+ if(tokens.length == 4) {
+ String deviceName = getDiskDeviceName(tokens[3]);
+ deviceNames.add(deviceName);
+ }
+ }
+ count++;
+ }
+
+ int id = 0;
+ for(String eachDeviceName: deviceNames) {
+ DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
+ diskDeviceInfo.setName(eachDeviceName);
+
+ //TODO set addtional info
+ // /sys/block/sda/queue
+ infos.add(diskDeviceInfo);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ if(reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+
+ return infos;
+ }
+
+ private static String getDiskDeviceName(String partitionName) {
+ byte[] bytes = partitionName.getBytes();
+
+ byte[] result = new byte[bytes.length];
+ int length = 0;
+ for(int i = 0; i < bytes.length; i++, length++) {
+ if(bytes[i] >= '0' && bytes[i] <= '9') {
+ break;
+ } else {
+ result[i] = bytes[i];
+ }
+ }
+
+ return new String(result, 0, length);
+ }
+
+ private static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
+ DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
+ diskDeviceInfo.setName("default");
+
+ List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+
+ infos.add(diskDeviceInfo);
+
+ return infos;
+ }
+
+
+ private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
+ Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
+ for(DiskDeviceInfo eachDevice: deviceInfos) {
+ deviceMap.put(eachDevice.getName(), eachDevice);
+ }
+
+ BufferedReader mountOutput = null;
+ try {
+ Process mountProcess = Runtime.getRuntime().exec("mount");
+ mountOutput = new BufferedReader(new InputStreamReader(
+ mountProcess.getInputStream()));
+ while (true) {
+ String line = mountOutput.readLine();
+ if (line == null) {
+ break;
+ }
+
+ int indexStart = line.indexOf(" on /");
+ int indexEnd = line.indexOf(" ", indexStart + 4);
+
+ String deviceName = line.substring(0, indexStart).trim();
+ System.out.println(deviceName);
+ String[] deviceNameTokens = deviceName.split("/");
+ if(deviceNameTokens.length == 3) {
+ if("dev".equals(deviceNameTokens[1])) {
+ String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
+ String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
+
+ DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
+ if(diskDeviceInfo != null) {
+ diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (mountOutput != null) {
+ mountOutput.close();
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ System.out.println("/dev/sde1".split("/").length);
+ for(String eachToken: "/dev/sde1".split("/")) {
+ System.out.println(eachToken);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
new file mode 100644
index 0000000..10f12be
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileScanRunner extends Thread {
+ private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
+
+ StorageManagerV2.StorgaeManagerContext smContext;
+ FileScannerV2 fileScanner;
+ Object requestQueueMonitor;
+ AtomicInteger numOfRunningScanners;
+ DiskFileScanScheduler diskFileScanScheduler;
+
+ int maxReadBytes;
+
+ public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler,
+ StorageManagerV2.StorgaeManagerContext smContext,
+ FileScannerV2 fileScanner, Object requestQueueMonitor,
+ AtomicInteger numOfRunningScanners) {
+ super("FileScanRunner:" + fileScanner.getId());
+ this.diskFileScanScheduler = diskFileScanScheduler;
+ this.fileScanner = fileScanner;
+ this.smContext = smContext;
+ this.requestQueueMonitor = requestQueueMonitor;
+ this.numOfRunningScanners = numOfRunningScanners;
+
+ this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
+ }
+
+ public void run() {
+ try {
+ long startTime = System.currentTimeMillis();
+ boolean fetching = fileScanner.isFetchProcessing();
+ fileScanner.scan(maxReadBytes);
+// if(diskFileScanScheduler.getDiskId() == 1) {
+// LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
+// ",fetching=" + fetching +
+// ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
+// }
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ } finally {
+ synchronized(requestQueueMonitor) {
+ numOfRunningScanners.decrementAndGet();
+ requestQueueMonitor.notifyAll();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
new file mode 100644
index 0000000..44c48a5
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class FileScannerV2 implements Scanner {
+ private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
+
+ protected AtomicBoolean fetchProcessing = new AtomicBoolean(false);
+
+ protected AtomicBoolean closed = new AtomicBoolean(false);
+
+ protected FileSystem fs;
+
+ protected boolean inited = false;
+ protected final Configuration conf;
+ protected final TableMeta meta;
+ protected final Schema schema;
+ protected final Fragment fragment;
+ protected final int columnNum;
+ protected Column[] targets;
+
+ protected StorageManagerV2.StorgaeManagerContext smContext;
+
+ protected boolean firstSchdeuled = true;
+
+ protected Queue<Tuple> tuplePool;
+
+ AtomicInteger tuplePoolMemory = new AtomicInteger();
+
+ protected abstract Tuple getNextTuple() throws IOException;
+
+ protected abstract void initFirstScan() throws IOException;
+
+ protected abstract long getFilePosition() throws IOException;
+
+ public FileScannerV2(final Configuration conf,
+ final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ this.conf = conf;
+ this.meta = meta;
+ this.schema = meta.getSchema();
+ this.fragment = fragment;
+ this.columnNum = this.schema.getColumnNum();
+
+ this.fs = fragment.getPath().getFileSystem(conf);
+
+ tuplePool = new ConcurrentLinkedQueue<Tuple>();
+ }
+
+ public void init() throws IOException {
+ closed.set(false);
+ fetchProcessing.set(false);
+ firstSchdeuled = true;
+ //tuplePoolIndex = 0;
+ if(tuplePool == null) {
+ tuplePool = new ConcurrentLinkedQueue<Tuple>();
+ }
+ tuplePool.clear();
+
+ if(!inited) {
+ smContext.requestFileScan(this);
+ }
+ inited = true;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ close();
+ inited = false;
+
+ init();
+ }
+
+ public String getId() {
+ return fragment.getPath().toString() + ":" + fragment.getStartOffset() + ":" +
+ fragment.getLength() + "_" + System.currentTimeMillis();
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+
+ @Override
+ public void setTarget(Column[] targets) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ this.targets = targets;
+ }
+
+ public Path getPath() {
+ return fragment.getPath();
+ }
+
+ public int getDiskId() {
+ if(fragment.getDiskIds().length <= 0) {
+ //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartOffset());
+ return -1;
+ } else {
+ return fragment.getDiskIds()[0];
+ }
+ }
+
+ public void setSearchCondition(Object expr) {
+ if (inited) {
+ throw new IllegalStateException("Should be called before init()");
+ }
+ }
+
+ public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
+ this.smContext = context;
+ }
+
+ public boolean isFetchProcessing() {
+// return fetchProcessing.get();
+ return tuplePoolMemory.get() > 16 * 1024 * 1024;
+ }
+
+ long lastScanScheduleTime;
+
+ public String toString() {
+ return fragment.getPath() + ":" + fragment.getStartOffset();
+ }
+
+ public void scan(int maxBytesPerSchedule) throws IOException {
+ if(firstSchdeuled) {
+ initFirstScan();
+ firstSchdeuled = false;
+ }
+ long scanStartPos = getFilePosition();
+ int recordCount = 0;
+ while(true) {
+ Tuple tuple = getNextTuple();
+ if(tuple == null) {
+ break;
+ }
+ tuplePoolMemory.addAndGet(tuple.size());
+ tuplePool.offer(tuple);
+ recordCount++;
+ if(recordCount % 1000 == 0) {
+ if(getFilePosition() - scanStartPos >= maxBytesPerSchedule) {
+ break;
+ } else {
+ synchronized(tuplePool) {
+ tuplePool.notifyAll();
+ }
+ }
+ }
+ }
+ if(tuplePool != null) {
+ synchronized(tuplePool) {
+ tuplePool.notifyAll();
+ }
+ }
+ if(!isClosed()) {
+ smContext.requestFileScan(this);
+ }
+ }
+
+ public void waitScanStart() {
+ //for test
+ synchronized(fetchProcessing) {
+ try {
+ fetchProcessing.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if(closed.get()) {
+ return;
+ }
+ closed.set(true);
+
+ synchronized(tuplePool) {
+ tuplePool.notifyAll();
+ }
+ LOG.info(toString() + " closed");
+ }
+
+ public boolean isClosed() {
+ return closed.get();
+ }
+
+ public Tuple next() throws IOException {
+ if(isClosed() && tuplePool == null) {
+ return null;
+ }
+ while(true) {
+ Tuple tuple = tuplePool.poll();
+ if(tuple == null) {
+ if(isClosed()) {
+ tuplePool.clear();
+ tuplePool = null;
+ return null;
+ }
+ synchronized(tuplePool) {
+ try {
+ tuplePool.wait();
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+ } else {
+ tuplePoolMemory.addAndGet(0 - tuple.size());
+ return tuple;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
new file mode 100644
index 0000000..11c3291
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.rcfile.BytesRefArrayWritable;
+import org.apache.tajo.storage.rcfile.ColumnProjectionUtils;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class RCFileScanner extends FileScannerV2 {
+ private static final Log LOG = LogFactory.getLog(RCFileScanner.class);
+
+ private RCFile.Reader in;
+ private long start;
+ private long end;
+ private boolean more = true;
+ private LongWritable key;
+ private BytesRefArrayWritable column;
+ private Integer [] projectionMap;
+
+ public RCFileScanner(final Configuration conf,
+ final TableMeta meta,
+ final Fragment fragment) throws IOException {
+ super(conf, meta, fragment);
+
+ this.start = fragment.getStartOffset();
+ this.end = start + fragment.getLength();
+ key = new LongWritable();
+ column = new BytesRefArrayWritable();
+ }
+
+ @Override
+ protected Tuple getNextTuple() throws IOException {
+ more = next(key);
+
+ if (more) {
+ column.clear();
+ in.getCurrentRow(column);
+ }
+
+ if(more) {
+ Tuple tuple = makeTuple();
+ return tuple;
+ } else {
+ close();
+ return null;
+ }
+ }
+
+ private Tuple makeTuple() throws IOException {
+ column.resetValid(schema.getColumnNum());
+ Tuple tuple = new VTuple(schema.getColumnNum());
+ int tid; // target column id
+ for (int i = 0; i < projectionMap.length; i++) {
+ tid = projectionMap[i];
+ // if the column is byte[0], it presents a NULL value.
+ if (column.get(tid).getLength() == 0) {
+ tuple.put(tid, DatumFactory.createNullDatum());
+ } else {
+ switch (targets[i].getDataType().getType()) {
+ case BOOLEAN:
+ tuple.put(tid,
+ DatumFactory.createBool(column.get(tid).getBytesCopy()[0]));
+ break;
+ case BIT:
+ tuple.put(tid,
+ DatumFactory.createBit(column.get(tid).getBytesCopy()[0]));
+ break;
+ case CHAR:
+ byte[] buf = column.get(tid).getBytesCopy();
+ tuple.put(tid,
+ DatumFactory.createChar(buf));
+ break;
+
+ case INT2:
+ tuple.put(tid,
+ DatumFactory.createInt2(Bytes.toShort(
+ column.get(tid).getBytesCopy())));
+ break;
+ case INT4:
+ tuple.put(tid,
+ DatumFactory.createInt4(Bytes.toInt(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case INT8:
+ tuple.put(tid,
+ DatumFactory.createInt8(Bytes.toLong(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case FLOAT4:
+ tuple.put(tid,
+ DatumFactory.createFloat4(Bytes.toFloat(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case FLOAT8:
+ tuple.put(tid,
+ DatumFactory.createFloat8(Bytes.toDouble(
+ column.get(tid).getBytesCopy())));
+ break;
+
+ case INET4:
+ tuple.put(tid,
+ DatumFactory.createInet4(column.get(tid).getBytesCopy()));
+ break;
+
+ case TEXT:
+ tuple.put(tid,
+ DatumFactory.createText(
+ column.get(tid).getBytesCopy()));
+ break;
+
+ case BLOB:
+ tuple.put(tid,
+ DatumFactory.createBlob(column.get(tid).getBytesCopy()));
+ break;
+
+ default:
+ throw new IOException("Unsupport data type");
+ }
+ }
+ }
+
+ return tuple;
+ }
+
+ @Override
+ public void init() throws IOException {
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+
+ prepareProjection(targets);
+
+ super.init();
+ }
+
+ private void prepareProjection(Column[] targets) {
+ projectionMap = new Integer[targets.length];
+ int tid;
+ for (int i = 0; i < targets.length; i++) {
+ tid = schema.getColumnIdByName(targets[i].getColumnName());
+ projectionMap[i] = tid;
+ }
+ ArrayList<Integer> projectionIdList = new ArrayList<Integer>(TUtil.newList(projectionMap));
+ ColumnProjectionUtils.setReadColumnIDs(conf, projectionIdList);
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if(in != null) {
+ in.close();
+ in = null;
+ }
+ } catch (Exception e) {
+ }
+
+ if(column != null) {
+ column.clear();
+ column = null;
+ }
+ super.close();
+ }
+
+ private boolean next(LongWritable key) throws IOException {
+ if (!more) {
+ return false;
+ }
+
+ more = in.next(key);
+ if (!more) {
+ return false;
+ }
+
+ long lastSeenSyncPos = in.lastSeenSyncPos();
+ if (lastSeenSyncPos >= end) {
+ more = false;
+ return more;
+ }
+ return more;
+ }
+
+ @Override
+ protected void initFirstScan() throws IOException {
+ if(!firstSchdeuled) {
+ return;
+ }
+ this.in = new RCFile.Reader(fs, fragment.getPath(), conf);
+
+ if (start > in.getPosition()) {
+ in.sync(start); // sync to start
+ }
+ this.start = in.getPosition();
+ more = start < end;
+ firstSchdeuled = false;
+ }
+
+ @Override
+ protected long getFilePosition() throws IOException {
+ return in.getPosition();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ //in.seek(0);
+ super.reset();
+ }
+
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ @Override
+ public boolean isSplittable(){
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
new file mode 100644
index 0000000..eca590f
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.storage.v2.StorageManagerV2.StorgaeManagerContext;
+
+public class ScanScheduler extends Thread {
+ private static final Log LOG = LogFactory.getLog(ScanScheduler.class);
+
+ private Object scanQueueLock;
+ private StorgaeManagerContext context;
+
+ private Map<String, FileScannerV2> requestMap = new HashMap<String, FileScannerV2>();
+
+ private Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
+
+ private Map<Integer, DiskDeviceInfo> diskDeviceInfoMap = new HashMap<Integer, DiskDeviceInfo>();
+
+ private SortedSet<DiskMountInfo> diskMountInfos = new TreeSet<DiskMountInfo>();
+
+ private AtomicBoolean stopped = new AtomicBoolean(false);
+
+ private Random rand = new Random(System.currentTimeMillis());
+
+ public ScanScheduler(StorgaeManagerContext context) {
+ this.context = context;
+ this.scanQueueLock = context.getScanQueueLock();
+
+ try {
+ List<DiskDeviceInfo> deviceInfos = DiskUtil.getDiskDeviceInfos();
+ for(DiskDeviceInfo eachInfo: deviceInfos) {
+ LOG.info("Create DiskScanQueue:" + eachInfo.getName());
+ diskDeviceInfoMap.put(eachInfo.getId(), eachInfo);
+
+ diskMountInfos.addAll(eachInfo.getMountInfos());
+ }
+
+ initFileScanners();
+ } catch (Exception e) {
+ LOG.error(e.getMessage(), e);
+ }
+ }
+
+ public void run() {
+ synchronized(scanQueueLock) {
+ while(!stopped.get()) {
+ FileScannerV2 fileScannerV2 = context.getScanQueue().poll();
+ if(fileScannerV2 == null) {
+ try {
+ scanQueueLock.wait();
+ } catch (InterruptedException e) {
+ break;
+ }
+ } else {
+ int diskId = fileScannerV2.getDiskId();
+
+ //LOG.info("Scan Scheduled:" + diskId + "," + fileScannerV2.toString());
+
+ if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
+ diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
+ if(diskId < 0) {
+
+ diskId = findMinQueueDisk();
+ if(diskId < 0) {
+ diskId = rand.nextInt(diskDeviceInfoMap.size());
+ }
+ }
+ }
+
+ synchronized(diskFileScannerMap) {
+ requestMap.put(fileScannerV2.getId(), fileScannerV2);
+ DiskFileScanScheduler diskScheduler = diskFileScannerMap.get(diskId);
+ diskScheduler.requestScanFile(fileScannerV2);
+ }
+ }
+ }
+ }
+ }
+
+ private int findMinQueueDisk() {
+ int minValue = Integer.MAX_VALUE;
+ int minId = -1;
+ synchronized(diskFileScannerMap) {
+ for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+ int queueSize = eachDiskScanner.getTotalQueueSize();
+ if(queueSize <= minValue) {
+ minValue = queueSize;
+ minId = eachDiskScanner.getDiskId();
+ }
+ }
+ }
+
+ return minId;
+ }
+
+ private int findDiskPartitionPath(String fullPath) {
+ for (DiskMountInfo eachMountInfo : diskMountInfos) {
+ if (fullPath.indexOf(eachMountInfo.getMountPath()) == 0) {
+ return eachMountInfo.getDeviceId();
+ }
+ }
+
+ return -1;
+ }
+
+ private void initFileScanners() {
+ for(Integer eachId: diskDeviceInfoMap.keySet()) {
+ DiskFileScanScheduler scanner = new DiskFileScanScheduler(context, diskDeviceInfoMap.get(eachId));
+ scanner.start();
+
+ diskFileScannerMap.put(eachId, scanner);
+ }
+ }
+
+ public void stopScheduler() {
+ stopped.set(true);
+ for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+ eachDiskScanner.stopScan();
+ }
+ this.interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
new file mode 100644
index 0000000..1ba6048
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Scanner;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class StorageManagerV2 extends AbstractStorageManager {
+ private final Log LOG = LogFactory.getLog(StorageManagerV2.class);
+
+ private Queue<FileScannerV2> scanQueue = new LinkedList<FileScannerV2>();
+
+ private Object scanQueueLock = new Object();
+
+ private Object scanDataLock = new Object();
+
+ private ScanScheduler scanScheduler;
+
+ private StorgaeManagerContext context;
+
+ public StorageManagerV2(TajoConf conf) throws IOException {
+ super(conf);
+ context = new StorgaeManagerContext();
+ scanScheduler = new ScanScheduler(context);
+ scanScheduler.start();
+ LOG.info("StorageManager v2 started...");
+ }
+
+ @Override
+ public Scanner getScanner(TableMeta meta, Fragment fragment,
+ Schema target) throws IOException {
+ Scanner scanner;
+
+ Class<? extends Scanner> scannerClass;
+
+ String handlerName = meta.getStoreType().name().toLowerCase();
+ String handlerNameKey = handlerName + "_v2";
+
+ scannerClass = SCANNER_HANDLER_CACHE.get(handlerNameKey);
+ if (scannerClass == null) {
+ scannerClass = conf.getClass(
+ String.format("tajo.storage.scanner-handler.v2.%s.class",
+ meta.getStoreType().name().toLowerCase()), null,
+ Scanner.class);
+ SCANNER_HANDLER_CACHE.put(handlerNameKey, scannerClass);
+ }
+
+ if (scannerClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ scanner = newScannerInstance(scannerClass, conf, meta, fragment);
+ if (scanner.isProjectable()) {
+ scanner.setTarget(target.toArray());
+ }
+
+ if(scanner instanceof FileScannerV2) {
+ ((FileScannerV2)scanner).setStorageManagerContext(context);
+ }
+ return scanner;
+ }
+
+ public void requestFileScan(FileScannerV2 fileScanner) {
+ synchronized(scanQueueLock) {
+ scanQueue.offer(fileScanner);
+
+ scanQueueLock.notifyAll();
+ }
+ }
+
+ public StorgaeManagerContext getContext() {
+ return context;
+ }
+
+ public class StorgaeManagerContext {
+ public Object getScanQueueLock() {
+ return scanQueueLock;
+ }
+
+ public Object getScanDataLock() {
+ return scanDataLock;
+ }
+
+ public Queue<FileScannerV2> getScanQueue() {
+ return scanQueue;
+ }
+
+ public int getMaxReadBytesPerScheduleSlot() {
+ return conf.getInt("tajo.storage.manager.maxReadBytes", 8 * 1024 * 1024); //8MB
+ }
+
+ public void requestFileScan(FileScannerV2 fileScanner) {
+ StorageManagerV2.this.requestFileScan(fileScanner);
+ }
+
+ public TajoConf getConf() {
+ return conf;
+ }
+ }
+
+ public void stop() {
+ if(scanScheduler != null) {
+ scanScheduler.stopScheduler();
+ }
+ }
+}