You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/10/27 06:54:12 UTC
[2/5] TAJO-1123: Use Fragment instead of FileFragment.
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
new file mode 100644
index 0000000..c925cd8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.*;
+
+public class FileStorageManager extends StorageManager {
+ private final Log LOG = LogFactory.getLog(FileStorageManager.class);
+
+ protected FileSystem fs;
+ protected Path tableBaseDir;
+ protected boolean blocksMetadataEnabled;
+ private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
+
+ @Override
+ protected void storageInit() throws IOException {
+ this.tableBaseDir = TajoConf.getWarehouseDir(conf);
+ this.fs = tableBaseDir.getFileSystem(conf);
+ this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+ DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+ if (!this.blocksMetadataEnabled)
+ LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+ }
+
+ public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
+ throws IOException {
+ FileSystem fs = path.getFileSystem(conf);
+ FileStatus status = fs.getFileStatus(path);
+ return getFileScanner(meta, schema, path, status);
+ }
+
+ public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
+ throws IOException {
+ Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+ return getScanner(meta, schema, fragment);
+ }
+
+ public FileSystem getFileSystem() {
+ return this.fs;
+ }
+
+ public Path getWarehouseDir() {
+ return this.tableBaseDir;
+ }
+
+ public void delete(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ fs.delete(tablePath, true);
+ }
+
+ public boolean exists(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ return fileSystem.exists(path);
+ }
+
+ /**
+ * This method deletes only data contained in the given path.
+ *
+ * @param path The path in which data are deleted.
+ * @throws IOException
+ */
+ public void deleteData(Path path) throws IOException {
+ FileSystem fileSystem = path.getFileSystem(conf);
+ FileStatus[] fileLists = fileSystem.listStatus(path);
+ for (FileStatus status : fileLists) {
+ fileSystem.delete(status.getPath(), true);
+ }
+ }
+
+ public Path getTablePath(String tableName) {
+ return new Path(tableBaseDir, tableName);
+ }
+
+ public Appender getAppender(TableMeta meta, Schema schema, Path path)
+ throws IOException {
+ Appender appender;
+
+ Class<? extends FileAppender> appenderClass;
+
+ String handlerName = meta.getStoreType().name().toLowerCase();
+ appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+ if (appenderClass == null) {
+ appenderClass = conf.getClass(
+ String.format("tajo.storage.appender-handler.%s.class",
+ meta.getStoreType().name().toLowerCase()), null,
+ FileAppender.class);
+ APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+ }
+
+ if (appenderClass == null) {
+ throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+ }
+
+ appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
+
+ return appender;
+ }
+
+ public TableMeta getTableMeta(Path tablePath) throws IOException {
+ TableMeta meta;
+
+ FileSystem fs = tablePath.getFileSystem(conf);
+ Path tableMetaPath = new Path(tablePath, ".meta");
+ if (!fs.exists(tableMetaPath)) {
+ throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
+ }
+
+ FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
+
+ CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) org.apache.tajo.util.FileUtil.loadProto(tableMetaIn,
+ CatalogProtos.TableProto.getDefaultInstance());
+ meta = new TableMeta(tableProto);
+
+ return meta;
+ }
+
+ public FileFragment[] split(String tableName) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
+ Path tablePath = new Path(tableBaseDir, tableName);
+ return split(tableName, tablePath, fragmentSize);
+ }
+
+ public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
+ listTablets.add(tablet);
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public FileFragment[] split(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+ }
+
+ public FileFragment[] split(String tableName, Path tablePath) throws IOException {
+ return split(tableName, tablePath, fs.getDefaultBlockSize());
+ }
+
+ private FileFragment[] split(String tableName, Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+ } else {
+ listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+ }
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+ Path tablePath, long size)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<FileFragment> listTablets = new ArrayList<FileFragment>();
+ FileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+ } else {
+ listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+ }
+ }
+
+ FileFragment[] tablets = new FileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ public long calculateSize(Path tablePath) throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+ long totalSize = 0;
+
+ if (fs.exists(tablePath)) {
+ totalSize = fs.getContentSummary(tablePath).getLength();
+ }
+
+ return totalSize;
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // FileInputFormat Area
+ /////////////////////////////////////////////////////////////////////////////
+
+ public static final PathFilter hiddenFileFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ /**
+ * Proxy PathFilter that accepts a path only if all filters given in the
+ * constructor do. Used by the listPaths() to apply the built-in
+ * hiddenFileFilter together with a user provided one (if any).
+ */
+ private static class MultiPathFilter implements PathFilter {
+ private List<PathFilter> filters;
+
+ public MultiPathFilter(List<PathFilter> filters) {
+ this.filters = filters;
+ }
+
+ public boolean accept(Path path) {
+ for (PathFilter filter : filters) {
+ if (!filter.accept(path)) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+ /**
+ * List input directories.
+ * Subclasses may override to, e.g., select only files matching a regular
+ * expression.
+ *
+ * @return array of FileStatus objects
+ * @throws IOException if zero items.
+ */
+ protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+ List<FileStatus> result = new ArrayList<FileStatus>();
+ if (dirs.length == 0) {
+ throw new IOException("No input paths specified in job");
+ }
+
+ List<IOException> errors = new ArrayList<IOException>();
+
+ // creates a MultiPathFilter with the hiddenFileFilter and the
+ // user provided one (if any).
+ List<PathFilter> filters = new ArrayList<PathFilter>();
+ filters.add(hiddenFileFilter);
+
+ PathFilter inputFilter = new MultiPathFilter(filters);
+
+ for (int i = 0; i < dirs.length; ++i) {
+ Path p = dirs[i];
+
+ FileSystem fs = p.getFileSystem(conf);
+ FileStatus[] matches = fs.globStatus(p, inputFilter);
+ if (matches == null) {
+ errors.add(new IOException("Input path does not exist: " + p));
+ } else if (matches.length == 0) {
+ errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+ } else {
+ for (FileStatus globStat : matches) {
+ if (globStat.isDirectory()) {
+ for (FileStatus stat : fs.listStatus(globStat.getPath(),
+ inputFilter)) {
+ result.add(stat);
+ }
+ } else {
+ result.add(globStat);
+ }
+ }
+ }
+ }
+
+ if (!errors.isEmpty()) {
+ throw new InvalidInputException(errors);
+ }
+ LOG.info("Total input paths to process : " + result.size());
+ return result;
+ }
+
+ /**
+ * Is the given filename splitable? Usually, true, but if the file is
+ * stream compressed, it will not be.
+ * <p/>
+ * <code>FileInputFormat</code> implementations can override this and return
+ * <code>false</code> to ensure that individual input files are never split-up
+ * so that Mappers process entire files.
+ *
+ *
+ * @param path the file name to check
+ * @param status get the file length
+ * @return is this file isSplittable?
+ */
+ protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
+ Scanner scanner = getFileScanner(meta, schema, path, status);
+ boolean split = scanner.isSplittable();
+ scanner.close();
+ return split;
+ }
+
+ private static final double SPLIT_SLOP = 1.1; // 10% slop
+
+ protected int getBlockIndex(BlockLocation[] blkLocations,
+ long offset) {
+ for (int i = 0; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) &&
+ (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+ /**
+ * A factory that makes the split for this class. It can be overridden
+ * by sub-classes to make sub-types
+ */
+ protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
+ return new FileFragment(fragmentId, file, start, length);
+ }
+
+ protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
+ String[] hosts) {
+ return new FileFragment(fragmentId, file, start, length, hosts);
+ }
+
+ protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
+ throws IOException {
+ return new FileFragment(fragmentId, file, blockLocation);
+ }
+
+ // for Non Splittable. eg, compressed gzip TextFile
+ protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
+ BlockLocation[] blkLocations) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+ hosts[i] = entry.getKey();
+ }
+ return new FileFragment(fragmentId, file, start, length, hosts);
+ }
+
+ /**
+ * Get the minimum split size
+ *
+ * @return the minimum number of bytes that can be in a split
+ */
+ public long getMinSplitSize() {
+ return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
+ }
+
+ /**
+ * Get Disk Ids by Volume Bytes
+ */
+ private int[] getDiskIds(VolumeId[] volumeIds) {
+ int[] diskIds = new int[volumeIds.length];
+ for (int i = 0; i < volumeIds.length; i++) {
+ int diskId = -1;
+ if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
+ diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
+ }
+ diskIds[i] = diskId;
+ }
+ return diskIds;
+ }
+
+ /**
+ * Generate the map of host and make them into Volume Ids.
+ *
+ */
+ private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
+ Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+ for (FileFragment frag : frags) {
+ String[] hosts = frag.getHosts();
+ int[] diskIds = frag.getDiskIds();
+ for (int i = 0; i < hosts.length; i++) {
+ Set<Integer> volumeList = volumeMap.get(hosts[i]);
+ if (volumeList == null) {
+ volumeList = new HashSet<Integer>();
+ volumeMap.put(hosts[i], volumeList);
+ }
+
+ if (diskIds.length > 0 && diskIds[i] > -1) {
+ volumeList.add(diskIds[i]);
+ }
+ }
+ }
+
+ return volumeMap;
+ }
+ /**
+ * Generate the list of files and make them into FileSplits.
+ *
+ * @throws IOException
+ */
+ public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
+ throws IOException {
+ // generate splits'
+
+ List<Fragment> splits = Lists.newArrayList();
+ List<Fragment> volumeSplits = Lists.newArrayList();
+ List<BlockLocation> blockLocations = Lists.newArrayList();
+
+ for (Path p : inputs) {
+ FileSystem fs = p.getFileSystem(conf);
+ ArrayList<FileStatus> files = Lists.newArrayList();
+ if (fs.isFile(p)) {
+ files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+ } else {
+ files.addAll(listStatus(p));
+ }
+
+ int previousSplitSize = splits.size();
+ for (FileStatus file : files) {
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length > 0) {
+ // Get locations of blocks of file
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+ boolean splittable = isSplittable(meta, schema, path, file);
+ if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+
+ if (splittable) {
+ for (BlockLocation blockLocation : blkLocations) {
+ volumeSplits.add(makeSplit(tableName, path, blockLocation));
+ }
+ blockLocations.addAll(Arrays.asList(blkLocations));
+
+ } else { // Non splittable
+ long blockSize = blkLocations[0].getLength();
+ if (blockSize >= length) {
+ blockLocations.addAll(Arrays.asList(blkLocations));
+ for (BlockLocation blockLocation : blkLocations) {
+ volumeSplits.add(makeSplit(tableName, path, blockLocation));
+ }
+ } else {
+ splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+ }
+ }
+
+ } else {
+ if (splittable) {
+
+ long minSize = Math.max(getMinSplitSize(), 1);
+
+ long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
+ long splitSize = Math.max(minSize, blockSize);
+ long bytesRemaining = length;
+
+ // for s3
+ while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+ int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts()));
+ bytesRemaining -= splitSize;
+ }
+ if (bytesRemaining > 0) {
+ int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
+ blkLocations[blkIndex].getHosts()));
+ }
+ } else { // Non splittable
+ splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+ }
+ }
+ } else {
+ //for zero length files
+ splits.add(makeSplit(tableName, path, 0, length));
+ }
+ }
+ if(LOG.isDebugEnabled()){
+ LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
+ }
+ }
+
+ // Combine original fileFragments with new VolumeId information
+ setVolumeMeta(volumeSplits, blockLocations);
+ splits.addAll(volumeSplits);
+ LOG.info("Total # of splits: " + splits.size());
+ return splits;
+ }
+
+ private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
+ throws IOException {
+
+ int locationSize = blockLocations.size();
+ int splitSize = splits.size();
+ if (locationSize == 0 || splitSize == 0) return;
+
+ if (locationSize != splitSize) {
+ // splits and locations don't match up
+ LOG.warn("Number of block locations not equal to number of splits: "
+ + "#locations=" + locationSize
+ + " #splits=" + splitSize);
+ return;
+ }
+
+ DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
+ int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+ int blockLocationIdx = 0;
+
+ Iterator<Fragment> iter = splits.iterator();
+ while (locationSize > blockLocationIdx) {
+
+ int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
+ List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
+ //BlockStorageLocation containing additional volume location information for each replica of each block.
+ BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
+
+ for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+ ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
+ blockLocationIdx++;
+ }
+ }
+ LOG.info("# of splits with volumeId " + splitSize);
+ }
+
+ private static class InvalidInputException extends IOException {
+ List<IOException> errors;
+ public InvalidInputException(List<IOException> errors) {
+ this.errors = errors;
+ }
+
+ @Override
+ public String getMessage(){
+ StringBuffer sb = new StringBuffer();
+ int messageLimit = Math.min(errors.size(), 10);
+ for (int i = 0; i < messageLimit ; i ++) {
+ sb.append(errors.get(i).getMessage()).append("\n");
+ }
+
+ if(messageLimit < errors.size())
+ sb.append("skipped .....").append("\n");
+
+ return sb.toString();
+ }
+ }
+
+ @Override
+ public List<Fragment> getSplits(String tableName, TableDesc table) throws IOException {
+ return getSplits(tableName, table.getMeta(), table.getSchema(), table.getPath());
+ }
+
+ @Override
+ public void createTable(TableDesc tableDesc) throws IOException {
+ if (!tableDesc.isExternal()) {
+ String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+ String databaseName = splitted[0];
+ String simpleTableName = splitted[1];
+
+ // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+ Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
+ tableDesc.setPath(tablePath);
+ } else {
+ Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
+ }
+
+ Path path = tableDesc.getPath();
+
+ FileSystem fs = path.getFileSystem(conf);
+ TableStats stats = new TableStats();
+ if (tableDesc.isExternal()) {
+ if (!fs.exists(path)) {
+ LOG.error(path.toUri() + " does not exist");
+ throw new IOException("ERROR: " + path.toUri() + " does not exist");
+ }
+ } else {
+ fs.mkdirs(path);
+ }
+
+ long totalSize = 0;
+
+ try {
+ totalSize = calculateSize(path);
+ } catch (IOException e) {
+ LOG.warn("Cannot calculate the size of the relation", e);
+ }
+
+ stats.setNumBytes(totalSize);
+
+ if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
+ stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+ }
+
+ tableDesc.setStats(stats);
+ }
+
+ @Override
+ public void purgeTable(TableDesc tableDesc) throws IOException {
+ try {
+ Path path = tableDesc.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ LOG.info("Delete table data dir: " + path);
+ fs.delete(path, true);
+ } catch (IOException e) {
+ throw new InternalError(e.getMessage());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 84d81d5..ece31dd 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -85,8 +85,8 @@ public class HashShuffleAppenderManager {
if (!fs.exists(dataFile.getParent())) {
fs.mkdirs(dataFile.getParent());
}
- FileAppender appender = (FileAppender) StorageManager.getStorageManager(
- tajoConf).getAppender(meta, outSchema, dataFile);
+ FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(
+ tajoConf, null).getAppender(meta, outSchema, dataFile);
appender.enableStats();
appender.init();
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 4122c76..89e59d0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -18,7 +18,6 @@
package org.apache.tajo.storage;
-import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
@@ -26,7 +25,7 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.statistics.ColumnStats;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
import java.util.ArrayList;
@@ -37,9 +36,9 @@ public class MergeScanner implements Scanner {
private Configuration conf;
private TableMeta meta;
private Schema schema;
- private List<FileFragment> fragments;
- private Iterator<FileFragment> iterator;
- private FileFragment currentFragment;
+ private List<Fragment> fragments;
+ private Iterator<Fragment> iterator;
+ private Fragment currentFragment;
private Scanner currentScanner;
private Tuple tuple;
private boolean projectable = false;
@@ -48,12 +47,12 @@ public class MergeScanner implements Scanner {
private float progress;
protected TableStats tableStats;
- public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList)
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList)
throws IOException {
this(conf, schema, meta, rawFragmentList, schema);
}
- public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList,
+ public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList,
Schema target)
throws IOException {
this.conf = conf;
@@ -61,12 +60,12 @@ public class MergeScanner implements Scanner {
this.meta = meta;
this.target = target;
- this.fragments = new ArrayList<FileFragment>();
+ this.fragments = new ArrayList<Fragment>();
long numBytes = 0;
- for (FileFragment eachFileFragment: rawFragmentList) {
- numBytes += eachFileFragment.getEndKey();
- if (eachFileFragment.getEndKey() > 0) {
+ for (Fragment eachFileFragment: rawFragmentList) {
+ if (eachFileFragment.getLength() > 0) {
+ numBytes += eachFileFragment.getLength();
fragments.add(eachFileFragment);
}
}
@@ -128,7 +127,7 @@ public class MergeScanner implements Scanner {
private Scanner getNextScanner() throws IOException {
if (iterator.hasNext()) {
currentFragment = iterator.next();
- currentScanner = StorageManager.getStorageManager((TajoConf)conf).getScanner(meta, schema,
+ currentScanner = StorageManager.getStorageManager((TajoConf)conf, meta.getStoreType()).getScanner(meta, schema,
currentFragment, target);
currentScanner.init();
return currentScanner;
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
index 4cec67d..19d333e 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -19,12 +19,12 @@ package org.apache.tajo.storage; /**
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
public class NullScanner extends FileScanner {
- public NullScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) {
+ public NullScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) {
super(conf, schema, meta, fragment);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index c8ac3a2..f72a5a1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -32,7 +32,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.BitArray;
@@ -63,7 +63,7 @@ public class RawFile {
private FileInputStream fis;
private long recordCount;
- public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+ public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
}
@@ -81,18 +81,18 @@ public class RawFile {
fis = new FileInputStream(file);
channel = fis.getChannel();
- fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize
+ fileLimit = fragment.getStartKey() + fragment.getLength(); // fileLimit is less than or equal to fileSize
if (tableStats != null) {
- tableStats.setNumBytes(fragment.getEndKey());
+ tableStats.setNumBytes(fragment.getLength());
}
if (LOG.isDebugEnabled()) {
LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size()
- + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit);
+ + ", fragment size :" + fragment.getLength() + ", fileLimit: " + fileLimit);
}
- if (fragment.getEndKey() < 64 * StorageUnit.KB) {
- bufferSize = fragment.getEndKey().intValue();
+ if (fragment.getLength() < 64 * StorageUnit.KB) {
+ bufferSize = (int)fragment.getLength();
} else {
bufferSize = 64 * StorageUnit.KB;
}
@@ -138,7 +138,7 @@ public class RawFile {
}
private boolean fillBuffer() throws IOException {
- if (numBytesRead >= fragment.getEndKey()) {
+ if (numBytesRead >= fragment.getLength()) {
eof = true;
return false;
}
@@ -150,7 +150,7 @@ public class RawFile {
return false;
} else {
buffer.flip();
- long realRemaining = fragment.getEndKey() - numBytesRead;
+ long realRemaining = fragment.getLength() - numBytesRead;
numBytesRead += bytesRead;
if (realRemaining < bufferSize) {
int newLimit = currentDataSize + (int) realRemaining;
@@ -397,7 +397,7 @@ public class RawFile {
@Override
public void close() throws IOException {
if (tableStats != null) {
- tableStats.setReadBytes(fragment.getEndKey());
+ tableStats.setReadBytes(fragment.getLength());
tableStats.setNumRows(recordCount);
}
@@ -431,14 +431,14 @@ public class RawFile {
}
if(eof || channel == null) {
- tableStats.setReadBytes(fragment.getEndKey());
+ tableStats.setReadBytes(fragment.getLength());
return 1.0f;
}
if (filePos == 0) {
return 0.0f;
} else {
- return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue()));
+ return Math.min(1.0f, ((float)filePos / (float)fragment.getLength()));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
index db36771..640cae2 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -33,7 +33,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BitArray;
import java.io.FileNotFoundException;
@@ -66,7 +66,7 @@ public class RowFile {
private BitArray nullFlags;
private long bufferStartPos;
- public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
+ public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
throws IOException {
super(conf, schema, meta, fragment);
@@ -75,8 +75,8 @@ public class RowFile {
nullFlags = new BitArray(schema.size());
tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
- this.start = fragment.getStartKey();
- this.end = this.start + fragment.getEndKey();
+ this.start = this.fragment.getStartKey();
+ this.end = this.start + this.fragment.getLength();
}
public void init() throws IOException {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 87b4197..23c2406 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -27,9 +27,11 @@ import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.storage.fragment.FileFragment;
@@ -48,14 +50,10 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* StorageManager
*/
-public class StorageManager {
+public abstract class StorageManager {
private final Log LOG = LogFactory.getLog(StorageManager.class);
- protected final TajoConf conf;
- protected final FileSystem fs;
- protected final Path tableBaseDir;
- protected final boolean blocksMetadataEnabled;
- private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
+ protected TajoConf conf;
private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
@@ -78,58 +76,52 @@ public class StorageManager {
private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
new ConcurrentHashMap<Class<?>, Constructor<?>>();
- private StorageManager(TajoConf conf) throws IOException {
- this.conf = conf;
- this.tableBaseDir = TajoConf.getWarehouseDir(conf);
- this.fs = tableBaseDir.getFileSystem(conf);
- this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
- DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
- if (!this.blocksMetadataEnabled)
- LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
- }
+ protected abstract void storageInit() throws IOException ;
+ public abstract void createTable(TableDesc tableDesc) throws IOException;
+ public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+ public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException;
- public static StorageManager getStorageManager(TajoConf conf) throws IOException {
- return getStorageManager(conf, null);
+ public void init(TajoConf tajoConf) throws IOException {
+ this.conf = tajoConf;
+ storageInit();
}
- public static synchronized StorageManager getStorageManager (
- TajoConf conf, Path warehouseDir) throws IOException {
+ public static FileStorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+ return getFileStorageManager(tajoConf, null);
+ }
+ public static FileStorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException {
URI uri;
- TajoConf localConf = new TajoConf(conf);
- if (warehouseDir != null) {
- localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString());
+ TajoConf copiedConf = new TajoConf(tajoConf);
+ if (warehousePath != null) {
+ copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString());
}
-
- uri = TajoConf.getWarehouseDir(localConf).toUri();
-
+ uri = TajoConf.getWarehouseDir(copiedConf).toUri();
String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
-
- if(storageManagers.containsKey(key)) {
- StorageManager sm = storageManagers.get(key);
- return sm;
- } else {
- StorageManager storageManager = new StorageManager(localConf);
- storageManagers.put(key, storageManager);
- return storageManager;
- }
+ return (FileStorageManager) getStorageManager(copiedConf, StoreType.CSV, key);
}
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
- throws IOException {
- FileSystem fs = path.getFileSystem(conf);
- FileStatus status = fs.getFileStatus(path);
- return getFileScanner(meta, schema, path, status);
+ public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException {
+ return getStorageManager(tajoConf, storeType, null);
}
- public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
- throws IOException {
- FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
- return getScanner(meta, schema, fragment);
- }
+ public static synchronized StorageManager getStorageManager (
+ TajoConf conf, StoreType storeType, String managerKey) throws IOException {
+ synchronized (storageManagers) {
+ String storeKey = storeType + managerKey;
+ StorageManager manager = storageManagers.get(storeKey);
+ if (manager == null) {
+ switch (storeType) {
+ default:
+ manager = new FileStorageManager();
+ }
- public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
- return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
+ manager.init(conf);
+ storageManagers.put(storeKey, manager);
+ }
+
+ return manager;
+ }
}
public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
@@ -140,42 +132,6 @@ public class StorageManager {
return getScanner(meta, schema, fragment, schema);
}
- public FileSystem getFileSystem() {
- return this.fs;
- }
-
- public Path getWarehouseDir() {
- return this.tableBaseDir;
- }
-
- public void delete(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- fs.delete(tablePath, true);
- }
-
- public boolean exists(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- return fileSystem.exists(path);
- }
-
- /**
- * This method deletes only data contained in the given path.
- *
- * @param path The path in which data are deleted.
- * @throws IOException
- */
- public void deleteData(Path path) throws IOException {
- FileSystem fileSystem = path.getFileSystem(conf);
- FileStatus[] fileLists = fileSystem.listStatus(path);
- for (FileStatus status : fileLists) {
- fileSystem.delete(status.getPath(), true);
- }
- }
-
- public Path getTablePath(String tableName) {
- return new Path(tableBaseDir, tableName);
- }
-
public Appender getAppender(TableMeta meta, Schema schema, Path path)
throws IOException {
Appender appender;
@@ -201,510 +157,11 @@ public class StorageManager {
return appender;
}
- public TableMeta getTableMeta(Path tablePath) throws IOException {
- TableMeta meta;
-
- FileSystem fs = tablePath.getFileSystem(conf);
- Path tableMetaPath = new Path(tablePath, ".meta");
- if (!fs.exists(tableMetaPath)) {
- throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
- }
-
- FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
- CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
- CatalogProtos.TableProto.getDefaultInstance());
- meta = new TableMeta(tableProto);
-
- return meta;
- }
-
- public FileFragment[] split(String tableName) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
- Path tablePath = new Path(tableBaseDir, tableName);
- return split(tableName, tablePath, fragmentSize);
- }
-
- public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
- listTablets.add(tablet);
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public FileFragment[] split(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
- }
-
- public FileFragment[] split(String tableName, Path tablePath) throws IOException {
- return split(tableName, tablePath, fs.getDefaultBlockSize());
- }
-
- private FileFragment[] split(String tableName, Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
- Path tablePath, long size)
- throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
-
- long defaultBlockSize = size;
- List<FileFragment> listTablets = new ArrayList<FileFragment>();
- FileFragment tablet;
-
- FileStatus[] fileLists = fs.listStatus(tablePath);
- for (FileStatus file : fileLists) {
- long remainFileSize = file.getLen();
- long start = 0;
- if (remainFileSize > defaultBlockSize) {
- while (remainFileSize > defaultBlockSize) {
- tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
- listTablets.add(tablet);
- start += defaultBlockSize;
- remainFileSize -= defaultBlockSize;
- }
- listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
- } else {
- listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
- }
- }
-
- FileFragment[] tablets = new FileFragment[listTablets.size()];
- listTablets.toArray(tablets);
-
- return tablets;
- }
-
- public long calculateSize(Path tablePath) throws IOException {
- FileSystem fs = tablePath.getFileSystem(conf);
- long totalSize = 0;
-
- if (fs.exists(tablePath)) {
- totalSize = fs.getContentSummary(tablePath).getLength();
- }
-
- return totalSize;
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // FileInputFormat Area
- /////////////////////////////////////////////////////////////////////////////
-
- public static final PathFilter hiddenFileFilter = new PathFilter() {
- public boolean accept(Path p) {
- String name = p.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
- };
-
- /**
- * Proxy PathFilter that accepts a path only if all filters given in the
- * constructor do. Used by the listPaths() to apply the built-in
- * hiddenFileFilter together with a user provided one (if any).
- */
- private static class MultiPathFilter implements PathFilter {
- private List<PathFilter> filters;
-
- public MultiPathFilter(List<PathFilter> filters) {
- this.filters = filters;
- }
-
- public boolean accept(Path path) {
- for (PathFilter filter : filters) {
- if (!filter.accept(path)) {
- return false;
- }
- }
- return true;
- }
- }
-
- /**
- * List input directories.
- * Subclasses may override to, e.g., select only files matching a regular
- * expression.
- *
- * @return array of FileStatus objects
- * @throws IOException if zero items.
- */
- protected List<FileStatus> listStatus(Path... dirs) throws IOException {
- List<FileStatus> result = new ArrayList<FileStatus>();
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
-
- List<IOException> errors = new ArrayList<IOException>();
-
- // creates a MultiPathFilter with the hiddenFileFilter and the
- // user provided one (if any).
- List<PathFilter> filters = new ArrayList<PathFilter>();
- filters.add(hiddenFileFilter);
-
- PathFilter inputFilter = new MultiPathFilter(filters);
-
- for (int i = 0; i < dirs.length; ++i) {
- Path p = dirs[i];
-
- FileSystem fs = p.getFileSystem(conf);
- FileStatus[] matches = fs.globStatus(p, inputFilter);
- if (matches == null) {
- errors.add(new IOException("Input path does not exist: " + p));
- } else if (matches.length == 0) {
- errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
- } else {
- for (FileStatus globStat : matches) {
- if (globStat.isDirectory()) {
- for (FileStatus stat : fs.listStatus(globStat.getPath(),
- inputFilter)) {
- result.add(stat);
- }
- } else {
- result.add(globStat);
- }
- }
- }
- }
-
- if (!errors.isEmpty()) {
- throw new InvalidInputException(errors);
- }
- LOG.info("Total input paths to process : " + result.size());
- return result;
- }
-
- /**
- * Is the given filename splitable? Usually, true, but if the file is
- * stream compressed, it will not be.
- * <p/>
- * <code>FileInputFormat</code> implementations can override this and return
- * <code>false</code> to ensure that individual input files are never split-up
- * so that Mappers process entire files.
- *
- *
- * @param path the file name to check
- * @param status get the file length
- * @return is this file isSplittable?
- */
- protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
- Scanner scanner = getFileScanner(meta, schema, path, status);
- boolean split = scanner.isSplittable();
- scanner.close();
- return split;
- }
-
- private static final double SPLIT_SLOP = 1.1; // 10% slop
-
- protected int getBlockIndex(BlockLocation[] blkLocations,
- long offset) {
- for (int i = 0; i < blkLocations.length; i++) {
- // is the offset inside this block?
- if ((blkLocations[i].getOffset() <= offset) &&
- (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
- return i;
- }
- }
- BlockLocation last = blkLocations[blkLocations.length - 1];
- long fileLength = last.getOffset() + last.getLength() - 1;
- throw new IllegalArgumentException("Offset " + offset +
- " is outside of file (0.." +
- fileLength + ")");
- }
-
- /**
- * A factory that makes the split for this class. It can be overridden
- * by sub-classes to make sub-types
- */
- protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
- return new FileFragment(fragmentId, file, start, length);
- }
-
- protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
- String[] hosts) {
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
- throws IOException {
- return new FileFragment(fragmentId, file, blockLocation);
- }
-
- // for Non Splittable. eg, compressed gzip TextFile
- protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
- BlockLocation[] blkLocations) throws IOException {
-
- Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
- for (BlockLocation blockLocation : blkLocations) {
- for (String host : blockLocation.getHosts()) {
- if (hostsBlockMap.containsKey(host)) {
- hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
- } else {
- hostsBlockMap.put(host, 1);
- }
- }
- }
-
- List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
- Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
- @Override
- public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
- return v1.getValue().compareTo(v2.getValue());
- }
- });
-
- String[] hosts = new String[blkLocations[0].getHosts().length];
-
- for (int i = 0; i < hosts.length; i++) {
- Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
- hosts[i] = entry.getKey();
- }
- return new FileFragment(fragmentId, file, start, length, hosts);
- }
-
- /**
- * Get the minimum split size
- *
- * @return the minimum number of bytes that can be in a split
- */
- public long getMinSplitSize() {
- return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
- }
-
- /**
- * Get Disk Ids by Volume Bytes
- */
- private int[] getDiskIds(VolumeId[] volumeIds) {
- int[] diskIds = new int[volumeIds.length];
- for (int i = 0; i < volumeIds.length; i++) {
- int diskId = -1;
- if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
- diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
- }
- diskIds[i] = diskId;
- }
- return diskIds;
- }
-
- /**
- * Generate the map of host and make them into Volume Ids.
- *
- */
- private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
- Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
- for (FileFragment frag : frags) {
- String[] hosts = frag.getHosts();
- int[] diskIds = frag.getDiskIds();
- for (int i = 0; i < hosts.length; i++) {
- Set<Integer> volumeList = volumeMap.get(hosts[i]);
- if (volumeList == null) {
- volumeList = new HashSet<Integer>();
- volumeMap.put(hosts[i], volumeList);
- }
-
- if (diskIds.length > 0 && diskIds[i] > -1) {
- volumeList.add(diskIds[i]);
- }
- }
- }
-
- return volumeMap;
- }
- /**
- * Generate the list of files and make them into FileSplits.
- *
- * @throws IOException
- */
- public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
- throws IOException {
- // generate splits'
-
- List<FileFragment> splits = Lists.newArrayList();
- List<FileFragment> volumeSplits = Lists.newArrayList();
- List<BlockLocation> blockLocations = Lists.newArrayList();
-
- for (Path p : inputs) {
- FileSystem fs = p.getFileSystem(conf);
- ArrayList<FileStatus> files = Lists.newArrayList();
- if (fs.isFile(p)) {
- files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
- } else {
- files.addAll(listStatus(p));
- }
-
- int previousSplitSize = splits.size();
- for (FileStatus file : files) {
- Path path = file.getPath();
- long length = file.getLen();
- if (length > 0) {
- // Get locations of blocks of file
- BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
- boolean splittable = isSplittable(meta, schema, path, file);
- if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-
- if (splittable) {
- for (BlockLocation blockLocation : blkLocations) {
- volumeSplits.add(makeSplit(tableName, path, blockLocation));
- }
- blockLocations.addAll(Arrays.asList(blkLocations));
-
- } else { // Non splittable
- long blockSize = blkLocations[0].getLength();
- if (blockSize >= length) {
- blockLocations.addAll(Arrays.asList(blkLocations));
- for (BlockLocation blockLocation : blkLocations) {
- volumeSplits.add(makeSplit(tableName, path, blockLocation));
- }
- } else {
- splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
- }
- }
-
- } else {
- if (splittable) {
-
- long minSize = Math.max(getMinSplitSize(), 1);
-
- long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
- long splitSize = Math.max(minSize, blockSize);
- long bytesRemaining = length;
-
- // for s3
- while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
- blkLocations[blkIndex].getHosts()));
- bytesRemaining -= splitSize;
- }
- if (bytesRemaining > 0) {
- int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
- splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
- blkLocations[blkIndex].getHosts()));
- }
- } else { // Non splittable
- splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
- }
- }
- } else {
- //for zero length files
- splits.add(makeSplit(tableName, path, 0, length));
- }
- }
- if(LOG.isDebugEnabled()){
- LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
- }
- }
-
- // Combine original fileFragments with new VolumeId information
- setVolumeMeta(volumeSplits, blockLocations);
- splits.addAll(volumeSplits);
- LOG.info("Total # of splits: " + splits.size());
- return splits;
- }
-
- private void setVolumeMeta(List<FileFragment> splits, final List<BlockLocation> blockLocations)
- throws IOException {
-
- int locationSize = blockLocations.size();
- int splitSize = splits.size();
- if (locationSize == 0 || splitSize == 0) return;
-
- if (locationSize != splitSize) {
- // splits and locations don't match up
- LOG.warn("Number of block locations not equal to number of splits: "
- + "#locations=" + locationSize
- + " #splits=" + splitSize);
- return;
- }
-
- DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
- int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
- int blockLocationIdx = 0;
-
- Iterator<FileFragment> iter = splits.iterator();
- while (locationSize > blockLocationIdx) {
-
- int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
- List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
- //BlockStorageLocation containing additional volume location information for each replica of each block.
- BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
-
- for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
- iter.next().setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
- blockLocationIdx++;
- }
- }
- LOG.info("# of splits with volumeId " + splitSize);
- }
-
- private static class InvalidInputException extends IOException {
- List<IOException> errors;
- public InvalidInputException(List<IOException> errors) {
- this.errors = errors;
- }
-
- @Override
- public String getMessage(){
- StringBuffer sb = new StringBuffer();
- int messageLimit = Math.min(errors.size(), 10);
- for (int i = 0; i < messageLimit ; i ++) {
- sb.append(errors.get(i).getMessage()).append("\n");
- }
-
- if(messageLimit < errors.size())
- sb.append("skipped .....").append("\n");
-
- return sb.toString();
- }
- }
-
private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
Configuration.class,
Schema.class,
TableMeta.class,
- FileFragment.class
+ Fragment.class
};
private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
@@ -773,14 +230,11 @@ public class StorageManager {
}
public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
- if (fragment instanceof FileFragment) {
- FileFragment fileFragment = (FileFragment)fragment;
- if (fileFragment.getEndKey() == 0) {
- Scanner scanner = new NullScanner(conf, schema, meta, fileFragment);
- scanner.setTarget(target.toArray());
+ if (fragment.isEmpty()) {
+ Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+ scanner.setTarget(target.toArray());
- return scanner;
- }
+ return scanner;
}
Scanner scanner;
@@ -796,7 +250,7 @@ public class StorageManager {
public static synchronized SeekableScanner getSeekableScanner(
TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException {
- return (SeekableScanner)getStorageManager(conf, null).getScanner(meta, schema, fragment, target);
+ return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
}
public static synchronized SeekableScanner getSeekableScanner(
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 1789cc9..4a66678 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.KeyValueSet;
import parquet.hadoop.ParquetOutputFormat;
@@ -111,6 +112,26 @@ public class StorageUtil extends StorageConstants {
return new Path(parent, sb.toString());
}
+ public static KeyValueSet newPhysicalProperties(CatalogProtos.StoreType type) {
+ KeyValueSet options = new KeyValueSet();
+ if (CatalogProtos.StoreType.CSV == type) {
+ options.set(CSVFILE_DELIMITER, DEFAULT_FIELD_DELIMITER);
+ } else if (CatalogProtos.StoreType.RCFILE == type) {
+ options.set(RCFILE_SERDE, DEFAULT_BINARY_SERDE);
+ } else if (CatalogProtos.StoreType.SEQUENCEFILE == type) {
+ options.set(SEQUENCEFILE_SERDE, DEFAULT_TEXT_SERDE);
+ options.set(SEQUENCEFILE_DELIMITER, DEFAULT_FIELD_DELIMITER);
+ } else if (type == CatalogProtos.StoreType.PARQUET) {
+ options.set(ParquetOutputFormat.BLOCK_SIZE, PARQUET_DEFAULT_BLOCK_SIZE);
+ options.set(ParquetOutputFormat.PAGE_SIZE, PARQUET_DEFAULT_PAGE_SIZE);
+ options.set(ParquetOutputFormat.COMPRESSION, PARQUET_DEFAULT_COMPRESSION_CODEC_NAME);
+ options.set(ParquetOutputFormat.ENABLE_DICTIONARY, PARQUET_DEFAULT_IS_DICTIONARY_ENABLED);
+ options.set(ParquetOutputFormat.VALIDATION, PARQUET_DEFAULT_IS_VALIDATION_ENABLED);
+ }
+
+ return options;
+ }
+
static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
@@ -124,7 +145,7 @@ public class StorageUtil extends StorageConstants {
* @param path
* @param recursive
* @return The maximum sequence number
- * @throws IOException
+ * @throws java.io.IOException
*/
public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException {
if (!fs.isDirectory(path)) {
@@ -220,4 +241,8 @@ public class StorageUtil extends StorageConstants {
amt -= ret;
}
}
+
+ public static boolean isFileStorageType(StoreType storageType) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
index 816ae25..72472fc 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -39,6 +39,7 @@ import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -63,7 +64,7 @@ public class AvroScanner extends FileScanner {
*/
public AvroScanner(Configuration conf,
final org.apache.tajo.catalog.Schema schema,
- final TableMeta meta, final FileFragment fragment) {
+ final TableMeta meta, final Fragment fragment) {
super(conf, schema, meta, fragment);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index 6fe6841..dcd9f0a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -120,6 +120,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
this.diskIds = diskIds;
}
+ @Override
public String getTableName() {
return this.tableName;
}
@@ -136,10 +137,20 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
return this.startOffset;
}
- public Long getEndKey() {
+ @Override
+ public String getKey() {
+ return this.uri.toString();
+ }
+
+ @Override
+ public long getLength() {
return this.length;
}
+ @Override
+ public boolean isEmpty() {
+ return this.length <= 0;
+ }
/**
*
* The offset range of tablets <b>MUST NOT</b> be overlapped.
@@ -169,7 +180,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
FileFragment t = (FileFragment) o;
if (getPath().equals(t.getPath())
&& TUtil.checkEquals(t.getStartKey(), this.getStartKey())
- && TUtil.checkEquals(t.getEndKey(), this.getEndKey())) {
+ && TUtil.checkEquals(t.getLength(), this.getLength())) {
return true;
}
}
@@ -195,7 +206,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
public String toString() {
return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
- + getEndKey() + "}" ;
+ + getLength() + "}" ;
}
public FragmentProto getProto() {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
index 3f9c160..ac43197 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -28,4 +28,12 @@ public interface Fragment extends ProtoObject<FragmentProto> {
@Override
public abstract FragmentProto getProto();
+
+ public abstract long getLength();
+
+ public abstract String getKey();
+
+ public String[] getHosts();
+
+ public abstract boolean isEmpty();
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index 36b89b8..2f8efcf 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -23,7 +23,7 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.io.IOException;
@@ -42,7 +42,7 @@ public class ParquetScanner extends FileScanner {
* @param fragment
*/
public ParquetScanner(Configuration conf, final Schema schema,
- final TableMeta meta, final FileFragment fragment) {
+ final TableMeta meta, final Fragment fragment) {
super(conf, schema, meta, fragment);
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index e5507ad..0e5c0e9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -39,6 +39,7 @@ import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.*;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import java.io.Closeable;
import java.io.*;
@@ -1176,12 +1177,12 @@ public class RCFile {
private SerializerDeserializer serde;
public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
- final FileFragment fragment) throws IOException {
+ final Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
conf.setInt("io.file.buffer.size", 4096); //TODO remove
- startOffset = fragment.getStartKey();
- endOffset = startOffset + fragment.getEndKey();
+ startOffset = this.fragment.getStartKey();
+ endOffset = startOffset + this.fragment.getLength();
start = 0;
}
@@ -1651,7 +1652,7 @@ public class RCFile {
return 0.0f;
} else {
//if scanner read the header, filePos moved to zero
- return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getEndKey()));
+ return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getLength()));
}
} catch (IOException e) {
LOG.error(e.getMessage(), e);
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index b0ef67d..74563ff 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -32,7 +32,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.BytesUtils;
import java.io.IOException;
@@ -71,7 +71,7 @@ public class SequenceFileScanner extends FileScanner {
private Writable EMPTY_KEY;
- public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+ public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
}
@@ -96,7 +96,7 @@ public class SequenceFileScanner extends FileScanner {
this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
this.start = fragment.getStartKey();
- this.end = start + fragment.getEndKey();
+ this.end = start + fragment.getLength();
if (fragment.getStartKey() > reader.getPosition())
reader.sync(this.start);
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
index 36a589a..6bca48b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
@@ -30,7 +30,7 @@ import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.trevni.ColumnFileReader;
import org.apache.trevni.ColumnValues;
import org.apache.trevni.avro.HadoopInput;
@@ -45,9 +45,9 @@ public class TrevniScanner extends FileScanner {
private int [] projectionMap;
private ColumnValues [] columns;
- public TrevniScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+ public TrevniScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
super(conf, schema, meta, fragment);
- reader = new ColumnFileReader(new HadoopInput(fragment.getPath(), conf));
+ reader = new ColumnFileReader(new HadoopInput(this.fragment.getPath(), conf));
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 212f374..8f79f4b 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -124,7 +124,7 @@ public class TestCompressionStorages {
meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
Path tablePath = new Path(testDir, "SplitCompression");
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -156,7 +156,7 @@ public class TestCompressionStorages {
tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
assertTrue(scanner.isSplittable());
scanner.init();
int tupleCnt = 0;
@@ -166,7 +166,7 @@ public class TestCompressionStorages {
}
scanner.close();
- scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+ scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
assertTrue(scanner.isSplittable());
scanner.init();
while ((tuple = scanner.next()) != null) {
@@ -191,7 +191,7 @@ public class TestCompressionStorages {
String fileName = "Compression_" + codec.getSimpleName();
Path tablePath = new Path(testDir, fileName);
- Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
+ Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
@@ -221,7 +221,7 @@ public class TestCompressionStorages {
FileFragment[] tablets = new FileFragment[1];
tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
- Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+ Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
if (StoreType.CSV == storeType) {
if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a355a94..17a8da7 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -32,6 +32,7 @@ import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
import org.junit.Test;
@@ -54,7 +55,7 @@ public class TestFileSystems {
private static String TEST_PATH = "target/test-data/TestFileSystem";
private TajoConf conf = null;
- private StorageManager sm = null;
+ private FileStorageManager sm = null;
private FileSystem fs = null;
Path testDir;
@@ -66,7 +67,7 @@ public class TestFileSystems {
fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
}
this.fs = fs;
- sm = StorageManager.getStorageManager(conf);
+ sm = StorageManager.getFileStorageManager(conf);
testDir = getTestDir(this.fs, TEST_PATH);
}
@@ -118,12 +119,12 @@ public class TestFileSystems {
appender.close();
FileStatus fileStatus = fs.getFileStatus(path);
- List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
+ List<Fragment> splits = sm.getSplits("table", meta, schema, path);
int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
assertEquals(splitSize, splits.size());
- for (FileFragment fragment : splits) {
- assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
+ for (Fragment fragment : splits) {
+ assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 51c612c..b70bba9 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
@@ -95,7 +96,7 @@ public class TestMergeScanner {
conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni", "parquet", "avro");
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
- sm = StorageManager.getStorageManager(conf, testDir);
+ sm = StorageManager.getFileStorageManager(conf, testDir);
}
@Test
@@ -115,7 +116,7 @@ public class TestMergeScanner {
}
Path table1Path = new Path(testDir, storeType + "_1.data");
- Appender appender1 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table1Path);
+ Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table1Path);
appender1.enableStats();
appender1.init();
int tupleNum = 10000;
@@ -137,7 +138,7 @@ public class TestMergeScanner {
}
Path table2Path = new Path(testDir, storeType + "_2.data");
- Appender appender2 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table2Path);
+ Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table2Path);
appender2.enableStats();
appender2.init();
@@ -159,7 +160,7 @@ public class TestMergeScanner {
FileStatus status1 = fs.getFileStatus(table1Path);
FileStatus status2 = fs.getFileStatus(table2Path);
- FileFragment[] fragment = new FileFragment[2];
+ Fragment[] fragment = new Fragment[2];
fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
@@ -167,7 +168,7 @@ public class TestMergeScanner {
targetSchema.addColumn(schema.getColumn(0));
targetSchema.addColumn(schema.getColumn(2));
- Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema);
+ Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
scanner.init();