You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/10/27 06:54:12 UTC

[2/5] TAJO-1123: Use Fragment instead of FileFragment.

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
new file mode 100644
index 0000000..c925cd8
--- /dev/null
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -0,0 +1,702 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.util.*;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.URI;
+import java.util.*;
+
+public class FileStorageManager extends StorageManager {
+  private final Log LOG = LogFactory.getLog(FileStorageManager.class);
+
+  protected FileSystem fs;
+  protected Path tableBaseDir;
+  protected boolean blocksMetadataEnabled;
+  private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
+
+  @Override
+  protected void storageInit() throws IOException {
+    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
+    this.fs = tableBaseDir.getFileSystem(conf);
+    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    if (!this.blocksMetadataEnabled)
+      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    return getFileScanner(meta, schema, path, status);
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
+      throws IOException {
+    Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+    return getScanner(meta, schema, fragment);
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public Path getWarehouseDir() {
+    return this.tableBaseDir;
+  }
+
+  public void delete(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    fs.delete(tablePath, true);
+  }
+
+  public boolean exists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    return fileSystem.exists(path);
+  }
+
+  /**
+   * This method deletes only data contained in the given path.
+   *
+   * @param path The path in which data are deleted.
+   * @throws IOException
+   */
+  public void deleteData(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    FileStatus[] fileLists = fileSystem.listStatus(path);
+    for (FileStatus status : fileLists) {
+      fileSystem.delete(status.getPath(), true);
+    }
+  }
+
+  public Path getTablePath(String tableName) {
+    return new Path(tableBaseDir, tableName);
+  }
+
+  public Appender getAppender(TableMeta meta, Schema schema, Path path)
+      throws IOException {
+    Appender appender;
+
+    Class<? extends FileAppender> appenderClass;
+
+    String handlerName = meta.getStoreType().name().toLowerCase();
+    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
+    if (appenderClass == null) {
+      appenderClass = conf.getClass(
+          String.format("tajo.storage.appender-handler.%s.class",
+              meta.getStoreType().name().toLowerCase()), null,
+          FileAppender.class);
+      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
+    }
+
+    if (appenderClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    appender = newAppenderInstance(appenderClass, conf, meta, schema, path);
+
+    return appender;
+  }
+
+  public TableMeta getTableMeta(Path tablePath) throws IOException {
+    TableMeta meta;
+
+    FileSystem fs = tablePath.getFileSystem(conf);
+    Path tableMetaPath = new Path(tablePath, ".meta");
+    if (!fs.exists(tableMetaPath)) {
+      throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
+    }
+
+    FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
+
+    CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) org.apache.tajo.util.FileUtil.loadProto(tableMetaIn,
+        CatalogProtos.TableProto.getDefaultInstance());
+    meta = new TableMeta(tableProto);
+
+    return meta;
+  }
+
+  public FileFragment[] split(String tableName) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fragmentSize);
+  }
+
+  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
+      listTablets.add(tablet);
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public FileFragment[] split(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  private FileFragment[] split(String tableName, Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+                                       Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public long calculateSize(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    long totalSize = 0;
+
+    if (fs.exists(tablePath)) {
+      totalSize = fs.getContentSummary(tablePath).getLength();
+    }
+
+    return totalSize;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // FileInputFormat Area
+  /////////////////////////////////////////////////////////////////////////////
+
+  public static final PathFilter hiddenFileFilter = new PathFilter() {
+    public boolean accept(Path p) {
+      String name = p.getName();
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression.
+   *
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(hiddenFileFilter);
+
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (int i = 0; i < dirs.length; ++i) {
+      Path p = dirs[i];
+
+      FileSystem fs = p.getFileSystem(conf);
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat : matches) {
+          if (globStat.isDirectory()) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * <p/>
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never split-up
+   * so that Mappers process entire files.
+   *
+   *
+   * @param path the file name to check
+   * @param status get the file length
+   * @return is this file isSplittable?
+   */
+  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
+    Scanner scanner = getFileScanner(meta, schema, path, status);
+    boolean split = scanner.isSplittable();
+    scanner.close();
+    return split;
+  }
+
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  protected int getBlockIndex(BlockLocation[] blkLocations,
+                              long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) &&
+          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
+    return new FileFragment(fragmentId, file, start, length);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
+                                   String[] hosts) {
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
+      throws IOException {
+    return new FileFragment(fragmentId, file, blockLocation);
+  }
+
+  // for Non Splittable. eg, compressed gzip TextFile
+  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
+                                      BlockLocation[] blkLocations) throws IOException {
+
+    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+    for (BlockLocation blockLocation : blkLocations) {
+      for (String host : blockLocation.getHosts()) {
+        if (hostsBlockMap.containsKey(host)) {
+          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+        } else {
+          hostsBlockMap.put(host, 1);
+        }
+      }
+    }
+
+    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+      @Override
+      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+        return v1.getValue().compareTo(v2.getValue());
+      }
+    });
+
+    String[] hosts = new String[blkLocations[0].getHosts().length];
+
+    for (int i = 0; i < hosts.length; i++) {
+      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+      hosts[i] = entry.getKey();
+    }
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  /**
+   * Get the minimum split size
+   *
+   * @return the minimum number of bytes that can be in a split
+   */
+  public long getMinSplitSize() {
+    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
+  }
+
+  /**
+   * Get Disk Ids by Volume Bytes
+   */
+  private int[] getDiskIds(VolumeId[] volumeIds) {
+    int[] diskIds = new int[volumeIds.length];
+    for (int i = 0; i < volumeIds.length; i++) {
+      int diskId = -1;
+      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
+        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
+      }
+      diskIds[i] = diskId;
+    }
+    return diskIds;
+  }
+
+  /**
+   * Generate the map of host and make them into Volume Ids.
+   *
+   */
+  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
+    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+    for (FileFragment frag : frags) {
+      String[] hosts = frag.getHosts();
+      int[] diskIds = frag.getDiskIds();
+      for (int i = 0; i < hosts.length; i++) {
+        Set<Integer> volumeList = volumeMap.get(hosts[i]);
+        if (volumeList == null) {
+          volumeList = new HashSet<Integer>();
+          volumeMap.put(hosts[i], volumeList);
+        }
+
+        if (diskIds.length > 0 && diskIds[i] > -1) {
+          volumeList.add(diskIds[i]);
+        }
+      }
+    }
+
+    return volumeMap;
+  }
+  /**
+   * Generate the list of files and make them into FileSplits.
+   *
+   * @throws IOException
+   */
+  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
+      throws IOException {
+    // generate splits'
+
+    List<Fragment> splits = Lists.newArrayList();
+    List<Fragment> volumeSplits = Lists.newArrayList();
+    List<BlockLocation> blockLocations = Lists.newArrayList();
+
+    for (Path p : inputs) {
+      FileSystem fs = p.getFileSystem(conf);
+      ArrayList<FileStatus> files = Lists.newArrayList();
+      if (fs.isFile(p)) {
+        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+      } else {
+        files.addAll(listStatus(p));
+      }
+
+      int previousSplitSize = splits.size();
+      for (FileStatus file : files) {
+        Path path = file.getPath();
+        long length = file.getLen();
+        if (length > 0) {
+          // Get locations of blocks of file
+          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+          boolean splittable = isSplittable(meta, schema, path, file);
+          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+
+            if (splittable) {
+              for (BlockLocation blockLocation : blkLocations) {
+                volumeSplits.add(makeSplit(tableName, path, blockLocation));
+              }
+              blockLocations.addAll(Arrays.asList(blkLocations));
+
+            } else { // Non splittable
+              long blockSize = blkLocations[0].getLength();
+              if (blockSize >= length) {
+                blockLocations.addAll(Arrays.asList(blkLocations));
+                for (BlockLocation blockLocation : blkLocations) {
+                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
+                }
+              } else {
+                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+              }
+            }
+
+          } else {
+            if (splittable) {
+
+              long minSize = Math.max(getMinSplitSize(), 1);
+
+              long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
+              long splitSize = Math.max(minSize, blockSize);
+              long bytesRemaining = length;
+
+              // for s3
+              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
+                    blkLocations[blkIndex].getHosts()));
+                bytesRemaining -= splitSize;
+              }
+              if (bytesRemaining > 0) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else { // Non splittable
+              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+            }
+          }
+        } else {
+          //for zero length files
+          splits.add(makeSplit(tableName, path, 0, length));
+        }
+      }
+      if(LOG.isDebugEnabled()){
+        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
+      }
+    }
+
+    // Combine original fileFragments with new VolumeId information
+    setVolumeMeta(volumeSplits, blockLocations);
+    splits.addAll(volumeSplits);
+    LOG.info("Total # of splits: " + splits.size());
+    return splits;
+  }
+
+  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
+      throws IOException {
+
+    int locationSize = blockLocations.size();
+    int splitSize = splits.size();
+    if (locationSize == 0 || splitSize == 0) return;
+
+    if (locationSize != splitSize) {
+      // splits and locations don't match up
+      LOG.warn("Number of block locations not equal to number of splits: "
+          + "#locations=" + locationSize
+          + " #splits=" + splitSize);
+      return;
+    }
+
+    DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
+    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    int blockLocationIdx = 0;
+
+    Iterator<Fragment> iter = splits.iterator();
+    while (locationSize > blockLocationIdx) {
+
+      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
+      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
+      //BlockStorageLocation containing additional volume location information for each replica of each block.
+      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
+
+      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+        ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
+        blockLocationIdx++;
+      }
+    }
+    LOG.info("# of splits with volumeId " + splitSize);
+  }
+
+  private static class InvalidInputException extends IOException {
+    List<IOException> errors;
+    public InvalidInputException(List<IOException> errors) {
+      this.errors = errors;
+    }
+
+    @Override
+    public String getMessage(){
+      StringBuffer sb = new StringBuffer();
+      int messageLimit = Math.min(errors.size(), 10);
+      for (int i = 0; i < messageLimit ; i ++) {
+        sb.append(errors.get(i).getMessage()).append("\n");
+      }
+
+      if(messageLimit < errors.size())
+        sb.append("skipped .....").append("\n");
+
+      return sb.toString();
+    }
+  }
+
+  @Override
+  public List<Fragment> getSplits(String tableName, TableDesc table) throws IOException {
+    return getSplits(tableName, table.getMeta(), table.getSchema(), table.getPath());
+  }
+
+  @Override
+  public void createTable(TableDesc tableDesc) throws IOException {
+    if (!tableDesc.isExternal()) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+      String databaseName = splitted[0];
+      String simpleTableName = splitted[1];
+
+      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
+      tableDesc.setPath(tablePath);
+    } else {
+      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
+    }
+
+    Path path = tableDesc.getPath();
+
+    FileSystem fs = path.getFileSystem(conf);
+    TableStats stats = new TableStats();
+    if (tableDesc.isExternal()) {
+      if (!fs.exists(path)) {
+        LOG.error(path.toUri() + " does not exist");
+        throw new IOException("ERROR: " + path.toUri() + " does not exist");
+      }
+    } else {
+      fs.mkdirs(path);
+    }
+
+    long totalSize = 0;
+
+    try {
+      totalSize = calculateSize(path);
+    } catch (IOException e) {
+      LOG.warn("Cannot calculate the size of the relation", e);
+    }
+
+    stats.setNumBytes(totalSize);
+
+    if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
+      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+    }
+
+    tableDesc.setStats(stats);
+  }
+
+  @Override
+  public void purgeTable(TableDesc tableDesc) throws IOException {
+    try {
+      Path path = tableDesc.getPath();
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Delete table data dir: " + path);
+      fs.delete(path, true);
+    } catch (IOException e) {
+      throw new InternalError(e.getMessage());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 84d81d5..ece31dd 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -85,8 +85,8 @@ public class HashShuffleAppenderManager {
         if (!fs.exists(dataFile.getParent())) {
           fs.mkdirs(dataFile.getParent());
         }
-        FileAppender appender = (FileAppender) StorageManager.getStorageManager(
-            tajoConf).getAppender(meta, outSchema, dataFile);
+        FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(
+            tajoConf, null).getAppender(meta, outSchema, dataFile);
         appender.enableStats();
         appender.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 4122c76..89e59d0 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.storage;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
@@ -26,7 +25,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -37,9 +36,9 @@ public class MergeScanner implements Scanner {
   private Configuration conf;
   private TableMeta meta;
   private Schema schema;
-  private List<FileFragment> fragments;
-  private Iterator<FileFragment> iterator;
-  private FileFragment currentFragment;
+  private List<Fragment> fragments;
+  private Iterator<Fragment> iterator;
+  private Fragment currentFragment;
   private Scanner currentScanner;
   private Tuple tuple;
   private boolean projectable = false;
@@ -48,12 +47,12 @@ public class MergeScanner implements Scanner {
   private float progress;
   protected TableStats tableStats;
 
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList)
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList)
       throws IOException {
     this(conf, schema, meta, rawFragmentList, schema);
   }
 
-  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<FileFragment> rawFragmentList,
+  public MergeScanner(Configuration conf, Schema schema, TableMeta meta, List<Fragment> rawFragmentList,
                       Schema target)
       throws IOException {
     this.conf = conf;
@@ -61,12 +60,12 @@ public class MergeScanner implements Scanner {
     this.meta = meta;
     this.target = target;
 
-    this.fragments = new ArrayList<FileFragment>();
+    this.fragments = new ArrayList<Fragment>();
 
     long numBytes = 0;
-    for (FileFragment eachFileFragment: rawFragmentList) {
-      numBytes += eachFileFragment.getEndKey();
-      if (eachFileFragment.getEndKey() > 0) {
+    for (Fragment eachFileFragment: rawFragmentList) {
+      if (eachFileFragment.getLength() > 0) {
+        numBytes += eachFileFragment.getLength();
         fragments.add(eachFileFragment);
       }
     }
@@ -128,7 +127,7 @@ public class MergeScanner implements Scanner {
   private Scanner getNextScanner() throws IOException {
     if (iterator.hasNext()) {
       currentFragment = iterator.next();
-      currentScanner = StorageManager.getStorageManager((TajoConf)conf).getScanner(meta, schema,
+      currentScanner = StorageManager.getStorageManager((TajoConf)conf, meta.getStoreType()).getScanner(meta, schema,
           currentFragment, target);
       currentScanner.init();
       return currentScanner;

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
index 4cec67d..19d333e 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -19,12 +19,12 @@ package org.apache.tajo.storage; /**
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
 
 public class NullScanner extends FileScanner {
-  public NullScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) {
+  public NullScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) {
     super(conf, schema, meta, fragment);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
index c8ac3a2..f72a5a1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -32,7 +32,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.BitArray;
 
@@ -63,7 +63,7 @@ public class RawFile {
     private FileInputStream fis;
     private long recordCount;
 
-    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+    public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
       super(conf, schema, meta, fragment);
     }
 
@@ -81,18 +81,18 @@ public class RawFile {
 
       fis = new FileInputStream(file);
       channel = fis.getChannel();
-      fileLimit = fragment.getStartKey() + fragment.getEndKey(); // fileLimit is less than or equal to fileSize
+      fileLimit = fragment.getStartKey() + fragment.getLength(); // fileLimit is less than or equal to fileSize
 
       if (tableStats != null) {
-        tableStats.setNumBytes(fragment.getEndKey());
+        tableStats.setNumBytes(fragment.getLength());
       }
       if (LOG.isDebugEnabled()) {
         LOG.debug("RawFileScanner open:" + fragment + "," + channel.position() + ", total file size :" + channel.size()
-            + ", fragment size :" + fragment.getEndKey() + ", fileLimit: " + fileLimit);
+            + ", fragment size :" + fragment.getLength() + ", fileLimit: " + fileLimit);
       }
 
-      if (fragment.getEndKey() < 64 * StorageUnit.KB) {
-	      bufferSize = fragment.getEndKey().intValue();
+      if (fragment.getLength() < 64 * StorageUnit.KB) {
+	      bufferSize = (int)fragment.getLength();
       } else {
 	      bufferSize = 64 * StorageUnit.KB;
       }
@@ -138,7 +138,7 @@ public class RawFile {
     }
 
     private boolean fillBuffer() throws IOException {
-      if (numBytesRead >= fragment.getEndKey()) {
+      if (numBytesRead >= fragment.getLength()) {
         eof = true;
         return false;
       }
@@ -150,7 +150,7 @@ public class RawFile {
         return false;
       } else {
         buffer.flip();
-        long realRemaining = fragment.getEndKey() - numBytesRead;
+        long realRemaining = fragment.getLength() - numBytesRead;
         numBytesRead += bytesRead;
         if (realRemaining < bufferSize) {
           int newLimit = currentDataSize + (int) realRemaining;
@@ -397,7 +397,7 @@ public class RawFile {
     @Override
     public void close() throws IOException {
       if (tableStats != null) {
-        tableStats.setReadBytes(fragment.getEndKey());
+        tableStats.setReadBytes(fragment.getLength());
         tableStats.setNumRows(recordCount);
       }
 
@@ -431,14 +431,14 @@ public class RawFile {
         }
 
         if(eof || channel == null) {
-          tableStats.setReadBytes(fragment.getEndKey());
+          tableStats.setReadBytes(fragment.getLength());
           return 1.0f;
         }
 
         if (filePos == 0) {
           return 0.0f;
         } else {
-          return Math.min(1.0f, ((float)filePos / fragment.getEndKey().floatValue()));
+          return Math.min(1.0f, ((float)filePos / (float)fragment.getLength()));
         }
       } catch (IOException e) {
         LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
index db36771..640cae2 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -33,7 +33,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.BitArray;
 
 import java.io.FileNotFoundException;
@@ -66,7 +66,7 @@ public class RowFile {
     private BitArray nullFlags;
     private long bufferStartPos;
 
-    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment)
+    public RowFileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment)
         throws IOException {
       super(conf, schema, meta, fragment);
 
@@ -75,8 +75,8 @@ public class RowFile {
 
       nullFlags = new BitArray(schema.size());
       tupleHeaderSize = nullFlags.bytesLength() + (2 * Short.SIZE / 8);
-      this.start = fragment.getStartKey();
-      this.end = this.start + fragment.getEndKey();
+      this.start = this.fragment.getStartKey();
+      this.end = this.start + this.fragment.getLength();
     }
 
     public void init() throws IOException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 87b4197..23c2406 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -27,9 +27,11 @@ import org.apache.hadoop.fs.*;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -48,14 +50,10 @@ import java.util.concurrent.ConcurrentHashMap;
 /**
  * StorageManager
  */
-public class StorageManager {
+public abstract class StorageManager {
   private final Log LOG = LogFactory.getLog(StorageManager.class);
 
-  protected final TajoConf conf;
-  protected final FileSystem fs;
-  protected final Path tableBaseDir;
-  protected final boolean blocksMetadataEnabled;
-  private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
+  protected TajoConf conf;
 
   private static final Map<String, StorageManager> storageManagers = Maps.newHashMap();
 
@@ -78,58 +76,52 @@ public class StorageManager {
   private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
       new ConcurrentHashMap<Class<?>, Constructor<?>>();
 
-  private StorageManager(TajoConf conf) throws IOException {
-    this.conf = conf;
-    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
-    this.fs = tableBaseDir.getFileSystem(conf);
-    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-    if (!this.blocksMetadataEnabled)
-      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
-  }
+  protected abstract void storageInit() throws IOException ;
+  public abstract void createTable(TableDesc tableDesc) throws IOException;
+  public abstract void purgeTable(TableDesc tableDesc) throws IOException;
+  public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException;
 
-  public static StorageManager getStorageManager(TajoConf conf) throws IOException {
-    return getStorageManager(conf, null);
+  public void init(TajoConf tajoConf) throws IOException {
+    this.conf = tajoConf;
+    storageInit();
   }
 
-  public static synchronized StorageManager getStorageManager (
-      TajoConf conf, Path warehouseDir) throws IOException {
+  public static FileStorageManager getFileStorageManager(TajoConf tajoConf) throws IOException {
+    return getFileStorageManager(tajoConf, null);
+  }
 
+  public static FileStorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException {
     URI uri;
-    TajoConf localConf = new TajoConf(conf);
-    if (warehouseDir != null) {
-      localConf.setVar(ConfVars.WAREHOUSE_DIR, warehouseDir.toUri().toString());
+    TajoConf copiedConf = new TajoConf(tajoConf);
+    if (warehousePath != null) {
+      copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString());
     }
-
-    uri = TajoConf.getWarehouseDir(localConf).toUri();
-
+    uri = TajoConf.getWarehouseDir(copiedConf).toUri();
     String key = "file".equals(uri.getScheme()) ? "file" : uri.toString();
-
-    if(storageManagers.containsKey(key)) {
-      StorageManager sm = storageManagers.get(key);
-      return sm;
-    } else {
-      StorageManager storageManager = new StorageManager(localConf);
-      storageManagers.put(key, storageManager);
-      return storageManager;
-    }
+    return (FileStorageManager) getStorageManager(copiedConf, StoreType.CSV, key);
   }
 
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
-      throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    return getFileScanner(meta, schema, path, status);
+  public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException {
+    return getStorageManager(tajoConf, storeType, null);
   }
 
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
-      throws IOException {
-    FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
-    return getScanner(meta, schema, fragment);
-  }
+  public static synchronized StorageManager getStorageManager (
+      TajoConf conf, StoreType storeType, String managerKey) throws IOException {
+    synchronized (storageManagers) {
+      String storeKey = storeType + managerKey;
+      StorageManager manager = storageManagers.get(storeKey);
+      if (manager == null) {
+        switch (storeType) {
+          default:
+            manager = new FileStorageManager();
+        }
 
-  public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException {
-    return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema);
+        manager.init(conf);
+        storageManagers.put(storeKey, manager);
+      }
+
+      return manager;
+    }
   }
 
   public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException {
@@ -140,42 +132,6 @@ public class StorageManager {
     return getScanner(meta, schema, fragment, schema);
   }
 
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
-  public Path getWarehouseDir() {
-    return this.tableBaseDir;
-  }
-
-  public void delete(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    fs.delete(tablePath, true);
-  }
-
-  public boolean exists(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    return fileSystem.exists(path);
-  }
-
-  /**
-   * This method deletes only data contained in the given path.
-   *
-   * @param path The path in which data are deleted.
-   * @throws IOException
-   */
-  public void deleteData(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    FileStatus[] fileLists = fileSystem.listStatus(path);
-    for (FileStatus status : fileLists) {
-      fileSystem.delete(status.getPath(), true);
-    }
-  }
-
-  public Path getTablePath(String tableName) {
-    return new Path(tableBaseDir, tableName);
-  }
-
   public Appender getAppender(TableMeta meta, Schema schema, Path path)
       throws IOException {
     Appender appender;
@@ -201,510 +157,11 @@ public class StorageManager {
     return appender;
   }
 
-  public TableMeta getTableMeta(Path tablePath) throws IOException {
-    TableMeta meta;
-
-    FileSystem fs = tablePath.getFileSystem(conf);
-    Path tableMetaPath = new Path(tablePath, ".meta");
-    if (!fs.exists(tableMetaPath)) {
-      throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
-    }
-
-    FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
-    CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn,
-        CatalogProtos.TableProto.getDefaultInstance());
-    meta = new TableMeta(tableProto);
-
-    return meta;
-  }
-
-  public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fragmentSize);
-  }
-
-  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
-      listTablets.add(tablet);
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public FileFragment[] split(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  private FileFragment[] split(String tableName, Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
-                                       Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public long calculateSize(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    long totalSize = 0;
-
-    if (fs.exists(tablePath)) {
-      totalSize = fs.getContentSummary(tablePath).getLength();
-    }
-
-    return totalSize;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // FileInputFormat Area
-  /////////////////////////////////////////////////////////////////////////////
-
-  public static final PathFilter hiddenFileFilter = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  /**
-   * Proxy PathFilter that accepts a path only if all filters given in the
-   * constructor do. Used by the listPaths() to apply the built-in
-   * hiddenFileFilter together with a user provided one (if any).
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (!filter.accept(path)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * List input directories.
-   * Subclasses may override to, e.g., select only files matching a regular
-   * expression.
-   *
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-
-    List<IOException> errors = new ArrayList<IOException>();
-
-    // creates a MultiPathFilter with the hiddenFileFilter and the
-    // user provided one (if any).
-    List<PathFilter> filters = new ArrayList<PathFilter>();
-    filters.add(hiddenFileFilter);
-
-    PathFilter inputFilter = new MultiPathFilter(filters);
-
-    for (int i = 0; i < dirs.length; ++i) {
-      Path p = dirs[i];
-
-      FileSystem fs = p.getFileSystem(conf);
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
-      if (matches == null) {
-        errors.add(new IOException("Input path does not exist: " + p));
-      } else if (matches.length == 0) {
-        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
-      } else {
-        for (FileStatus globStat : matches) {
-          if (globStat.isDirectory()) {
-            for (FileStatus stat : fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              result.add(stat);
-            }
-          } else {
-            result.add(globStat);
-          }
-        }
-      }
-    }
-
-    if (!errors.isEmpty()) {
-      throw new InvalidInputException(errors);
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  /**
-   * Is the given filename splitable? Usually, true, but if the file is
-   * stream compressed, it will not be.
-   * <p/>
-   * <code>FileInputFormat</code> implementations can override this and return
-   * <code>false</code> to ensure that individual input files are never split-up
-   * so that Mappers process entire files.
-   *
-   *
-   * @param path the file name to check
-   * @param status get the file length
-   * @return is this file isSplittable?
-   */
-  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
-    Scanner scanner = getFileScanner(meta, schema, path, status);
-    boolean split = scanner.isSplittable();
-    scanner.close();
-    return split;
-  }
-
-  private static final double SPLIT_SLOP = 1.1;   // 10% slop
-
-  protected int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-    for (int i = 0; i < blkLocations.length; i++) {
-      // is the offset inside this block?
-      if ((blkLocations[i].getOffset() <= offset) &&
-          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
-        return i;
-      }
-    }
-    BlockLocation last = blkLocations[blkLocations.length - 1];
-    long fileLength = last.getOffset() + last.getLength() - 1;
-    throw new IllegalArgumentException("Offset " + offset +
-        " is outside of file (0.." +
-        fileLength + ")");
-  }
-
-  /**
-   * A factory that makes the split for this class. It can be overridden
-   * by sub-classes to make sub-types
-   */
-  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
-    return new FileFragment(fragmentId, file, start, length);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
-                                   String[] hosts) {
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
-      throws IOException {
-    return new FileFragment(fragmentId, file, blockLocation);
-  }
-
-  // for Non Splittable. eg, compressed gzip TextFile
-  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
-                                      BlockLocation[] blkLocations) throws IOException {
-
-    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
-    for (BlockLocation blockLocation : blkLocations) {
-      for (String host : blockLocation.getHosts()) {
-        if (hostsBlockMap.containsKey(host)) {
-          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
-        } else {
-          hostsBlockMap.put(host, 1);
-        }
-      }
-    }
-
-    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
-    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
-      @Override
-      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
-        return v1.getValue().compareTo(v2.getValue());
-      }
-    });
-
-    String[] hosts = new String[blkLocations[0].getHosts().length];
-
-    for (int i = 0; i < hosts.length; i++) {
-      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
-      hosts[i] = entry.getKey();
-    }
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  /**
-   * Get the minimum split size
-   *
-   * @return the minimum number of bytes that can be in a split
-   */
-  public long getMinSplitSize() {
-    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
-  }
-
-  /**
-   * Get Disk Ids by Volume Bytes
-   */
-  private int[] getDiskIds(VolumeId[] volumeIds) {
-    int[] diskIds = new int[volumeIds.length];
-    for (int i = 0; i < volumeIds.length; i++) {
-      int diskId = -1;
-      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
-        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
-      }
-      diskIds[i] = diskId;
-    }
-    return diskIds;
-  }
-
-  /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (FileFragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
-   * Generate the list of files and make them into FileSplits.
-   *
-   * @throws IOException
-   */
-  public List<FileFragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
-      throws IOException {
-    // generate splits'
-
-    List<FileFragment> splits = Lists.newArrayList();
-    List<FileFragment> volumeSplits = Lists.newArrayList();
-    List<BlockLocation> blockLocations = Lists.newArrayList();
-
-    for (Path p : inputs) {
-      FileSystem fs = p.getFileSystem(conf);
-      ArrayList<FileStatus> files = Lists.newArrayList();
-      if (fs.isFile(p)) {
-        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
-      } else {
-        files.addAll(listStatus(p));
-      }
-
-      int previousSplitSize = splits.size();
-      for (FileStatus file : files) {
-        Path path = file.getPath();
-        long length = file.getLen();
-        if (length > 0) {
-          // Get locations of blocks of file
-          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-          boolean splittable = isSplittable(meta, schema, path, file);
-          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-
-            if (splittable) {
-              for (BlockLocation blockLocation : blkLocations) {
-                volumeSplits.add(makeSplit(tableName, path, blockLocation));
-              }
-              blockLocations.addAll(Arrays.asList(blkLocations));
-
-            } else { // Non splittable
-              long blockSize = blkLocations[0].getLength();
-              if (blockSize >= length) {
-                blockLocations.addAll(Arrays.asList(blkLocations));
-                for (BlockLocation blockLocation : blkLocations) {
-                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
-                }
-              } else {
-                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
-              }
-            }
-
-          } else {
-            if (splittable) {
-
-              long minSize = Math.max(getMinSplitSize(), 1);
-
-              long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
-              long splitSize = Math.max(minSize, blockSize);
-              long bytesRemaining = length;
-
-              // for s3
-              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
-                    blkLocations[blkIndex].getHosts()));
-                bytesRemaining -= splitSize;
-              }
-              if (bytesRemaining > 0) {
-                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
-                    blkLocations[blkIndex].getHosts()));
-              }
-            } else { // Non splittable
-              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
-            }
-          }
-        } else {
-          //for zero length files
-          splits.add(makeSplit(tableName, path, 0, length));
-        }
-      }
-      if(LOG.isDebugEnabled()){
-        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
-      }
-    }
-
-    // Combine original fileFragments with new VolumeId information
-    setVolumeMeta(volumeSplits, blockLocations);
-    splits.addAll(volumeSplits);
-    LOG.info("Total # of splits: " + splits.size());
-    return splits;
-  }
-
-  private void setVolumeMeta(List<FileFragment> splits, final List<BlockLocation> blockLocations)
-      throws IOException {
-
-    int locationSize = blockLocations.size();
-    int splitSize = splits.size();
-    if (locationSize == 0 || splitSize == 0) return;
-
-    if (locationSize != splitSize) {
-      // splits and locations don't match up
-      LOG.warn("Number of block locations not equal to number of splits: "
-          + "#locations=" + locationSize
-          + " #splits=" + splitSize);
-      return;
-    }
-
-    DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
-    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
-    int blockLocationIdx = 0;
-
-    Iterator<FileFragment> iter = splits.iterator();
-    while (locationSize > blockLocationIdx) {
-
-      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
-      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
-      //BlockStorageLocation containing additional volume location information for each replica of each block.
-      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
-
-      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-        iter.next().setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
-        blockLocationIdx++;
-      }
-    }
-    LOG.info("# of splits with volumeId " + splitSize);
-  }
-
-  private static class InvalidInputException extends IOException {
-    List<IOException> errors;
-    public InvalidInputException(List<IOException> errors) {
-      this.errors = errors;
-    }
-
-    @Override
-    public String getMessage(){
-      StringBuffer sb = new StringBuffer();
-      int messageLimit = Math.min(errors.size(), 10);
-      for (int i = 0; i < messageLimit ; i ++) {
-        sb.append(errors.get(i).getMessage()).append("\n");
-      }
-
-      if(messageLimit < errors.size())
-        sb.append("skipped .....").append("\n");
-
-      return sb.toString();
-    }
-  }
-
   private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
       Configuration.class,
       Schema.class,
       TableMeta.class,
-      FileFragment.class
+      Fragment.class
   };
 
   private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
@@ -773,14 +230,11 @@ public class StorageManager {
   }
 
   public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
-    if (fragment instanceof FileFragment) {
-      FileFragment fileFragment = (FileFragment)fragment;
-      if (fileFragment.getEndKey() == 0) {
-        Scanner scanner = new NullScanner(conf, schema, meta, fileFragment);
-        scanner.setTarget(target.toArray());
+    if (fragment.isEmpty()) {
+      Scanner scanner = new NullScanner(conf, schema, meta, fragment);
+      scanner.setTarget(target.toArray());
 
-        return scanner;
-      }
+      return scanner;
     }
 
     Scanner scanner;
@@ -796,7 +250,7 @@ public class StorageManager {
 
   public static synchronized SeekableScanner getSeekableScanner(
       TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException {
-    return (SeekableScanner)getStorageManager(conf, null).getScanner(meta, schema, fragment, target);
+    return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target);
   }
 
   public static synchronized SeekableScanner getSeekableScanner(

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 1789cc9..4a66678 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -27,6 +27,7 @@ import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;
 import parquet.hadoop.ParquetOutputFormat;
@@ -111,6 +112,26 @@ public class StorageUtil extends StorageConstants {
     return new Path(parent, sb.toString());
   }
 
+  public static KeyValueSet newPhysicalProperties(CatalogProtos.StoreType type) {
+    KeyValueSet options = new KeyValueSet();
+    if (CatalogProtos.StoreType.CSV == type) {
+      options.set(CSVFILE_DELIMITER, DEFAULT_FIELD_DELIMITER);
+    } else if (CatalogProtos.StoreType.RCFILE == type) {
+      options.set(RCFILE_SERDE, DEFAULT_BINARY_SERDE);
+    } else if (CatalogProtos.StoreType.SEQUENCEFILE == type) {
+      options.set(SEQUENCEFILE_SERDE, DEFAULT_TEXT_SERDE);
+      options.set(SEQUENCEFILE_DELIMITER, DEFAULT_FIELD_DELIMITER);
+    } else if (type == CatalogProtos.StoreType.PARQUET) {
+      options.set(ParquetOutputFormat.BLOCK_SIZE, PARQUET_DEFAULT_BLOCK_SIZE);
+      options.set(ParquetOutputFormat.PAGE_SIZE, PARQUET_DEFAULT_PAGE_SIZE);
+      options.set(ParquetOutputFormat.COMPRESSION, PARQUET_DEFAULT_COMPRESSION_CODEC_NAME);
+      options.set(ParquetOutputFormat.ENABLE_DICTIONARY, PARQUET_DEFAULT_IS_DICTIONARY_ENABLED);
+      options.set(ParquetOutputFormat.VALIDATION, PARQUET_DEFAULT_IS_VALIDATION_ENABLED);
+    }
+
+    return options;
+  }
+
   static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*";
   static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*";
 
@@ -124,7 +145,7 @@ public class StorageUtil extends StorageConstants {
    * @param path
    * @param recursive
    * @return The maximum sequence number
-   * @throws IOException
+   * @throws java.io.IOException
    */
   public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException {
     if (!fs.isDirectory(path)) {
@@ -220,4 +241,8 @@ public class StorageUtil extends StorageConstants {
       amt -= ret;
     }
   }
+
+  public static boolean isFileStorageType(StoreType storageType) {
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
index 816ae25..72472fc 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -39,6 +39,7 @@ import org.apache.tajo.storage.FileScanner;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -63,7 +64,7 @@ public class AvroScanner extends FileScanner {
    */
   public AvroScanner(Configuration conf,
                      final org.apache.tajo.catalog.Schema schema,
-                     final TableMeta meta, final FileFragment fragment) {
+                     final TableMeta meta, final Fragment fragment) {
     super(conf, schema, meta, fragment);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
index 6fe6841..dcd9f0a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java
@@ -120,6 +120,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
     this.diskIds = diskIds;
   }
 
+  @Override
   public String getTableName() {
     return this.tableName;
   }
@@ -136,10 +137,20 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
     return this.startOffset;
   }
 
-  public Long getEndKey() {
+  @Override
+  public String getKey() {
+    return this.uri.toString();
+  }
+
+  @Override
+  public long getLength() {
     return this.length;
   }
 
+  @Override
+  public boolean isEmpty() {
+    return this.length <= 0;
+  }
   /**
    * 
    * The offset range of tablets <b>MUST NOT</b> be overlapped.
@@ -169,7 +180,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
       FileFragment t = (FileFragment) o;
       if (getPath().equals(t.getPath())
           && TUtil.checkEquals(t.getStartKey(), this.getStartKey())
-          && TUtil.checkEquals(t.getEndKey(), this.getEndKey())) {
+          && TUtil.checkEquals(t.getLength(), this.getLength())) {
         return true;
       }
     }
@@ -195,7 +206,7 @@ public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneab
   public String toString() {
     return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": "
     		+getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": "
-        + getEndKey() + "}" ;
+        + getLength() + "}" ;
   }
 
   public FragmentProto getProto() {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
index 3f9c160..ac43197 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java
@@ -28,4 +28,12 @@ public interface Fragment extends ProtoObject<FragmentProto> {
 
   @Override
   public abstract FragmentProto getProto();
+
+  public abstract long getLength();
+
+  public abstract String getKey();
+
+  public String[] getHosts();
+
+  public abstract boolean isEmpty();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index 36b89b8..2f8efcf 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -23,7 +23,7 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.storage.FileScanner;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.IOException;
 
@@ -42,7 +42,7 @@ public class ParquetScanner extends FileScanner {
    * @param fragment
    */
   public ParquetScanner(Configuration conf, final Schema schema,
-                        final TableMeta meta, final FileFragment fragment) {
+                        final TableMeta meta, final Fragment fragment) {
     super(conf, schema, meta, fragment);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index e5507ad..0e5c0e9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -39,6 +39,7 @@ import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 
 import java.io.Closeable;
 import java.io.*;
@@ -1176,12 +1177,12 @@ public class RCFile {
     private SerializerDeserializer serde;
 
     public RCFileScanner(Configuration conf, final Schema schema, final TableMeta meta,
-                         final FileFragment fragment) throws IOException {
+                         final Fragment fragment) throws IOException {
       super(conf, schema, meta, fragment);
       conf.setInt("io.file.buffer.size", 4096); //TODO remove
 
-      startOffset = fragment.getStartKey();
-      endOffset = startOffset + fragment.getEndKey();
+      startOffset = this.fragment.getStartKey();
+      endOffset = startOffset + this.fragment.getLength();
       start = 0;
     }
 
@@ -1651,7 +1652,7 @@ public class RCFile {
           return 0.0f;
         } else {
           //if scanner read the header, filePos moved to zero
-          return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getEndKey()));
+          return Math.min(1.0f, (float)(Math.max(filePos - startOffset, 0)) / (float)(fragment.getLength()));
         }
       } catch (IOException e) {
         LOG.error(e.getMessage(), e);

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index b0ef67d..74563ff 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -32,7 +32,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.BytesUtils;
 
 import java.io.IOException;
@@ -71,7 +71,7 @@ public class SequenceFileScanner extends FileScanner {
 
   private Writable EMPTY_KEY;
 
-  public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+  public SequenceFileScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
     super(conf, schema, meta, fragment);
   }
 
@@ -96,7 +96,7 @@ public class SequenceFileScanner extends FileScanner {
     this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0);
 
     this.start = fragment.getStartKey();
-    this.end = start + fragment.getEndKey();
+    this.end = start + fragment.getLength();
 
     if (fragment.getStartKey() > reader.getPosition())
       reader.sync(this.start);

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
index 36a589a..6bca48b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/trevni/TrevniScanner.java
@@ -30,7 +30,7 @@ import org.apache.tajo.datum.ProtobufDatumFactory;
 import org.apache.tajo.storage.FileScanner;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
-import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.trevni.ColumnFileReader;
 import org.apache.trevni.ColumnValues;
 import org.apache.trevni.avro.HadoopInput;
@@ -45,9 +45,9 @@ public class TrevniScanner extends FileScanner {
   private int [] projectionMap;
   private ColumnValues [] columns;
 
-  public TrevniScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException {
+  public TrevniScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException {
     super(conf, schema, meta, fragment);
-    reader = new ColumnFileReader(new HadoopInput(fragment.getPath(), conf));
+    reader = new ColumnFileReader(new HadoopInput(this.fragment.getPath(), conf));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 212f374..8f79f4b 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -124,7 +124,7 @@ public class TestCompressionStorages {
     meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "SplitCompression");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
 
@@ -156,7 +156,7 @@ public class TestCompressionStorages {
     tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
     tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
 
-    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
     assertTrue(scanner.isSplittable());
     scanner.init();
     int tupleCnt = 0;
@@ -166,7 +166,7 @@ public class TestCompressionStorages {
     }
     scanner.close();
 
-    scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+    scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
     assertTrue(scanner.isSplittable());
     scanner.init();
     while ((tuple = scanner.next()) != null) {
@@ -191,7 +191,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -221,7 +221,7 @@ public class TestCompressionStorages {
     FileFragment[] tablets = new FileFragment[1];
     tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
 
-    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
 
     if (StoreType.CSV == storeType) {
       if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index a355a94..17a8da7 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -32,6 +32,7 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.s3.InMemoryFileSystemStore;
 import org.apache.tajo.storage.s3.SmallBlockS3FileSystem;
 import org.junit.Test;
@@ -54,7 +55,7 @@ public class TestFileSystems {
 
   private static String TEST_PATH = "target/test-data/TestFileSystem";
   private TajoConf conf = null;
-  private StorageManager sm = null;
+  private FileStorageManager sm = null;
   private FileSystem fs = null;
   Path testDir;
 
@@ -66,7 +67,7 @@ public class TestFileSystems {
       fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
     }
     this.fs = fs;
-    sm = StorageManager.getStorageManager(conf);
+    sm = StorageManager.getFileStorageManager(conf);
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 
@@ -118,12 +119,12 @@ public class TestFileSystems {
     appender.close();
     FileStatus fileStatus = fs.getFileStatus(path);
 
-    List<FileFragment> splits = sm.getSplits("table", meta, schema, path);
+    List<Fragment> splits = sm.getSplits("table", meta, schema, path);
     int splitSize = (int) Math.ceil(fileStatus.getLen() / (double) fileStatus.getBlockSize());
     assertEquals(splitSize, splits.size());
 
-    for (FileFragment fragment : splits) {
-      assertTrue(fragment.getEndKey() <= fileStatus.getBlockSize());
+    for (Fragment fragment : splits) {
+      assertTrue(fragment.getLength() <= fileStatus.getBlockSize());
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/cc46aeac/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 51c612c..b70bba9 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -95,7 +96,7 @@ public class TestMergeScanner {
     conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
   }
 
   @Test
@@ -115,7 +116,7 @@ public class TestMergeScanner {
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table1Path);
+    Appender appender1 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -137,7 +138,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table2Path);
+    Appender appender2 = StorageManager.getFileStorageManager(conf).getAppender(meta, schema, table2Path);
     appender2.enableStats();
     appender2.init();
 
@@ -159,7 +160,7 @@ public class TestMergeScanner {
 
     FileStatus status1 = fs.getFileStatus(table1Path);
     FileStatus status2 = fs.getFileStatus(table2Path);
-    FileFragment[] fragment = new FileFragment[2];
+    Fragment[] fragment = new Fragment[2];
     fragment[0] = new FileFragment("tablet1", table1Path, 0, status1.getLen());
     fragment[1] = new FileFragment("tablet1", table2Path, 0, status2.getLen());
 
@@ -167,7 +168,7 @@ public class TestMergeScanner {
     targetSchema.addColumn(schema.getColumn(0));
     targetSchema.addColumn(schema.getColumn(2));
 
-    Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.<FileFragment>newList(fragment), targetSchema);
+    Scanner scanner = new MergeScanner(conf, schema, meta, TUtil.newList(fragment), targetSchema);
     assertEquals(isProjectableStorage(meta.getStoreType()), scanner.isProjectable());
 
     scanner.init();