You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/12/12 09:22:30 UTC

[16/45] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
new file mode 100644
index 0000000..060bf16
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java
@@ -0,0 +1,882 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.tajo.OverridableConf;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.TajoConstants;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.plan.logical.LogicalNode;
+import org.apache.tajo.plan.logical.ScanNode;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
+import org.apache.tajo.util.Bytes;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class FileStorageManager extends StorageManager {
+  private final Log LOG = LogFactory.getLog(FileStorageManager.class);
+
+  static final String OUTPUT_FILE_PREFIX="part-";
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(2);
+          return fmt;
+        }
+      };
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(6);
+          return fmt;
+        }
+      };
+
+  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
+      new ThreadLocal<NumberFormat>() {
+        @Override
+        public NumberFormat initialValue() {
+          NumberFormat fmt = NumberFormat.getInstance();
+          fmt.setGroupingUsed(false);
+          fmt.setMinimumIntegerDigits(3);
+          return fmt;
+        }
+      };
+
+  protected FileSystem fs;
+  protected Path tableBaseDir;
+  protected boolean blocksMetadataEnabled;
+  private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
+
+  public FileStorageManager(StoreType storeType) {
+    super(storeType);
+  }
+
+  @Override
+  protected void storageInit() throws IOException {
+    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
+    this.fs = tableBaseDir.getFileSystem(conf);
+    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
+        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
+    if (!this.blocksMetadataEnabled)
+      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
+      throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    return getFileScanner(meta, schema, path, status);
+  }
+
+  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
+      throws IOException {
+    Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+    return getScanner(meta, schema, fragment);
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  public Path getWarehouseDir() {
+    return this.tableBaseDir;
+  }
+
+  public void delete(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    fs.delete(tablePath, true);
+  }
+
+  public boolean exists(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    return fileSystem.exists(path);
+  }
+
+  /**
+   * This method deletes only data contained in the given path.
+   *
+   * @param path The path in which data are deleted.
+   * @throws IOException
+   */
+  public void deleteData(Path path) throws IOException {
+    FileSystem fileSystem = path.getFileSystem(conf);
+    FileStatus[] fileLists = fileSystem.listStatus(path);
+    for (FileStatus status : fileLists) {
+      fileSystem.delete(status.getPath(), true);
+    }
+  }
+
+  public Path getTablePath(String tableName) {
+    return new Path(tableBaseDir, tableName);
+  }
+
+  @VisibleForTesting
+  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
+      throws IOException {
+    return getAppender(null, null, meta, schema, filePath);
+  }
+
+  public FileFragment[] split(String tableName) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
+    Path tablePath = new Path(tableBaseDir, tableName);
+    return split(tableName, tablePath, fragmentSize);
+  }
+
+  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
+      listTablets.add(tablet);
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public FileFragment[] split(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
+  }
+
+  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
+    return split(tableName, tablePath, fs.getDefaultBlockSize());
+  }
+
+  private FileFragment[] split(String tableName, Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
+                                       Path tablePath, long size)
+      throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    long defaultBlockSize = size;
+    List<FileFragment> listTablets = new ArrayList<FileFragment>();
+    FileFragment tablet;
+
+    FileStatus[] fileLists = fs.listStatus(tablePath);
+    for (FileStatus file : fileLists) {
+      long remainFileSize = file.getLen();
+      long start = 0;
+      if (remainFileSize > defaultBlockSize) {
+        while (remainFileSize > defaultBlockSize) {
+          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
+          listTablets.add(tablet);
+          start += defaultBlockSize;
+          remainFileSize -= defaultBlockSize;
+        }
+        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
+      } else {
+        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
+      }
+    }
+
+    FileFragment[] tablets = new FileFragment[listTablets.size()];
+    listTablets.toArray(tablets);
+
+    return tablets;
+  }
+
+  public long calculateSize(Path tablePath) throws IOException {
+    FileSystem fs = tablePath.getFileSystem(conf);
+    long totalSize = 0;
+
+    if (fs.exists(tablePath)) {
+      totalSize = fs.getContentSummary(tablePath).getLength();
+    }
+
+    return totalSize;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // FileInputFormat Area
+  /////////////////////////////////////////////////////////////////////////////
+  public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) {
+    if (taskAttemptId == null) {
+      // For testcase
+      return workDir;
+    }
+    // The final result of a task will be written in a file named part-ss-nnnnnnn,
+    // where ss is the subquery id associated with this task, and nnnnnn is the task id.
+    Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
+        OUTPUT_FILE_PREFIX +
+            OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
+            OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" +
+            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
+    LOG.info("Output File Path: " + outFilePath);
+
+    return outFilePath;
+  }
+
+  /**
+   * Proxy PathFilter that accepts a path only if all filters given in the
+   * constructor do. Used by the listPaths() to apply the built-in
+   * hiddenFileFilter together with a user provided one (if any).
+   */
+  private static class MultiPathFilter implements PathFilter {
+    private List<PathFilter> filters;
+
+    public MultiPathFilter(List<PathFilter> filters) {
+      this.filters = filters;
+    }
+
+    public boolean accept(Path path) {
+      for (PathFilter filter : filters) {
+        if (!filter.accept(path)) {
+          return false;
+        }
+      }
+      return true;
+    }
+  }
+
+  /**
+   * List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression.
+   *
+   * @return array of FileStatus objects
+   * @throws IOException if zero items.
+   */
+  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    if (dirs.length == 0) {
+      throw new IOException("No input paths specified in job");
+    }
+
+    List<IOException> errors = new ArrayList<IOException>();
+
+    // creates a MultiPathFilter with the hiddenFileFilter and the
+    // user provided one (if any).
+    List<PathFilter> filters = new ArrayList<PathFilter>();
+    filters.add(hiddenFileFilter);
+
+    PathFilter inputFilter = new MultiPathFilter(filters);
+
+    for (int i = 0; i < dirs.length; ++i) {
+      Path p = dirs[i];
+
+      FileSystem fs = p.getFileSystem(conf);
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
+      } else {
+        for (FileStatus globStat : matches) {
+          if (globStat.isDirectory()) {
+            for (FileStatus stat : fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }
+          } else {
+            result.add(globStat);
+          }
+        }
+      }
+    }
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
+    }
+    LOG.info("Total input paths to process : " + result.size());
+    return result;
+  }
+
+  /**
+   * Is the given filename splitable? Usually, true, but if the file is
+   * stream compressed, it will not be.
+   * <p/>
+   * <code>FileInputFormat</code> implementations can override this and return
+   * <code>false</code> to ensure that individual input files are never split-up
+   * so that Mappers process entire files.
+   *
+   *
+   * @param path the file name to check
+   * @param status get the file length
+   * @return is this file isSplittable?
+   */
+  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
+    Scanner scanner = getFileScanner(meta, schema, path, status);
+    boolean split = scanner.isSplittable();
+    scanner.close();
+    return split;
+  }
+
+  private static final double SPLIT_SLOP = 1.1;   // 10% slop
+
+  protected int getBlockIndex(BlockLocation[] blkLocations,
+                              long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) &&
+          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * A factory that makes the split for this class. It can be overridden
+   * by sub-classes to make sub-types
+   */
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
+    return new FileFragment(fragmentId, file, start, length);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
+                                   String[] hosts) {
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
+      throws IOException {
+    return new FileFragment(fragmentId, file, blockLocation);
+  }
+
+  // for Non Splittable. eg, compressed gzip TextFile
+  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
+                                      BlockLocation[] blkLocations) throws IOException {
+
+    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
+    for (BlockLocation blockLocation : blkLocations) {
+      for (String host : blockLocation.getHosts()) {
+        if (hostsBlockMap.containsKey(host)) {
+          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+        } else {
+          hostsBlockMap.put(host, 1);
+        }
+      }
+    }
+
+    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+
+      @Override
+      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
+        return v1.getValue().compareTo(v2.getValue());
+      }
+    });
+
+    String[] hosts = new String[blkLocations[0].getHosts().length];
+
+    for (int i = 0; i < hosts.length; i++) {
+      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
+      hosts[i] = entry.getKey();
+    }
+    return new FileFragment(fragmentId, file, start, length, hosts);
+  }
+
+  /**
+   * Get the minimum split size
+   *
+   * @return the minimum number of bytes that can be in a split
+   */
+  public long getMinSplitSize() {
+    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
+  }
+
+  /**
+   * Get Disk Ids by Volume Bytes
+   */
+  private int[] getDiskIds(VolumeId[] volumeIds) {
+    int[] diskIds = new int[volumeIds.length];
+    for (int i = 0; i < volumeIds.length; i++) {
+      int diskId = -1;
+      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
+        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
+      }
+      diskIds[i] = diskId;
+    }
+    return diskIds;
+  }
+
+  /**
+   * Generate the map of host and make them into Volume Ids.
+   *
+   */
+  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
+    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
+    for (FileFragment frag : frags) {
+      String[] hosts = frag.getHosts();
+      int[] diskIds = frag.getDiskIds();
+      for (int i = 0; i < hosts.length; i++) {
+        Set<Integer> volumeList = volumeMap.get(hosts[i]);
+        if (volumeList == null) {
+          volumeList = new HashSet<Integer>();
+          volumeMap.put(hosts[i], volumeList);
+        }
+
+        if (diskIds.length > 0 && diskIds[i] > -1) {
+          volumeList.add(diskIds[i]);
+        }
+      }
+    }
+
+    return volumeMap;
+  }
+  /**
+   * Generate the list of files and make them into FileSplits.
+   *
+   * @throws IOException
+   */
+  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
+      throws IOException {
+    // generate splits'
+
+    List<Fragment> splits = Lists.newArrayList();
+    List<Fragment> volumeSplits = Lists.newArrayList();
+    List<BlockLocation> blockLocations = Lists.newArrayList();
+
+    for (Path p : inputs) {
+      FileSystem fs = p.getFileSystem(conf);
+
+      ArrayList<FileStatus> files = Lists.newArrayList();
+      if (fs.isFile(p)) {
+        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+      } else {
+        files.addAll(listStatus(p));
+      }
+
+      int previousSplitSize = splits.size();
+      for (FileStatus file : files) {
+        Path path = file.getPath();
+        long length = file.getLen();
+        if (length > 0) {
+          // Get locations of blocks of file
+          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+          boolean splittable = isSplittable(meta, schema, path, file);
+          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
+
+            if (splittable) {
+              for (BlockLocation blockLocation : blkLocations) {
+                volumeSplits.add(makeSplit(tableName, path, blockLocation));
+              }
+              blockLocations.addAll(Arrays.asList(blkLocations));
+
+            } else { // Non splittable
+              long blockSize = blkLocations[0].getLength();
+              if (blockSize >= length) {
+                blockLocations.addAll(Arrays.asList(blkLocations));
+                for (BlockLocation blockLocation : blkLocations) {
+                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
+                }
+              } else {
+                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+              }
+            }
+
+          } else {
+            if (splittable) {
+
+              long minSize = Math.max(getMinSplitSize(), 1);
+
+              long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
+              long splitSize = Math.max(minSize, blockSize);
+              long bytesRemaining = length;
+
+              // for s3
+              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
+                    blkLocations[blkIndex].getHosts()));
+                bytesRemaining -= splitSize;
+              }
+              if (bytesRemaining > 0) {
+                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else { // Non splittable
+              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
+            }
+          }
+        } else {
+          //for zero length files
+          splits.add(makeSplit(tableName, path, 0, length));
+        }
+      }
+      if(LOG.isDebugEnabled()){
+        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
+      }
+    }
+
+    // Combine original fileFragments with new VolumeId information
+    setVolumeMeta(volumeSplits, blockLocations);
+    splits.addAll(volumeSplits);
+    LOG.info("Total # of splits: " + splits.size());
+    return splits;
+  }
+
+  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
+      throws IOException {
+
+    int locationSize = blockLocations.size();
+    int splitSize = splits.size();
+    if (locationSize == 0 || splitSize == 0) return;
+
+    if (locationSize != splitSize) {
+      // splits and locations don't match up
+      LOG.warn("Number of block locations not equal to number of splits: "
+          + "#locations=" + locationSize
+          + " #splits=" + splitSize);
+      return;
+    }
+
+    DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
+    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
+    int blockLocationIdx = 0;
+
+    Iterator<Fragment> iter = splits.iterator();
+    while (locationSize > blockLocationIdx) {
+
+      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
+      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
+      //BlockStorageLocation containing additional volume location information for each replica of each block.
+      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
+
+      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
+        ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
+        blockLocationIdx++;
+      }
+    }
+    LOG.info("# of splits with volumeId " + splitSize);
+  }
+
+  private static class InvalidInputException extends IOException {
+    List<IOException> errors;
+    public InvalidInputException(List<IOException> errors) {
+      this.errors = errors;
+    }
+
+    @Override
+    public String getMessage(){
+      StringBuffer sb = new StringBuffer();
+      int messageLimit = Math.min(errors.size(), 10);
+      for (int i = 0; i < messageLimit ; i ++) {
+        sb.append(errors.get(i).getMessage()).append("\n");
+      }
+
+      if(messageLimit < errors.size())
+        sb.append("skipped .....").append("\n");
+
+      return sb.toString();
+    }
+  }
+
+  @Override
+  public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException {
+    return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath()));
+  }
+
+  @Override
+  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
+    if (!tableDesc.isExternal()) {
+      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
+      String databaseName = splitted[0];
+      String simpleTableName = splitted[1];
+
+      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
+      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
+      tableDesc.setPath(tablePath.toUri());
+    } else {
+      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
+    }
+
+    Path path = new Path(tableDesc.getPath());
+
+    FileSystem fs = path.getFileSystem(conf);
+    TableStats stats = new TableStats();
+    if (tableDesc.isExternal()) {
+      if (!fs.exists(path)) {
+        LOG.error(path.toUri() + " does not exist");
+        throw new IOException("ERROR: " + path.toUri() + " does not exist");
+      }
+    } else {
+      fs.mkdirs(path);
+    }
+
+    long totalSize = 0;
+
+    try {
+      totalSize = calculateSize(path);
+    } catch (IOException e) {
+      LOG.warn("Cannot calculate the size of the relation", e);
+    }
+
+    stats.setNumBytes(totalSize);
+
+    if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
+      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
+    }
+
+    tableDesc.setStats(stats);
+  }
+
+  @Override
+  public void purgeTable(TableDesc tableDesc) throws IOException {
+    try {
+      Path path = new Path(tableDesc.getPath());
+      FileSystem fs = path.getFileSystem(conf);
+      LOG.info("Delete table data dir: " + path);
+      fs.delete(path, true);
+    } catch (IOException e) {
+      throw new InternalError(e.getMessage());
+    }
+  }
+
+  @Override
+  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numResultFragments) throws IOException {
+    // Listing table data file which is not empty.
+    // If the table is a partitioned table, return file list which has same partition key.
+    Path tablePath = new Path(tableDesc.getPath());
+    FileSystem fs = tablePath.getFileSystem(conf);
+
+    //In the case of partitioned table, we should return same partition key data files.
+    int partitionDepth = 0;
+    if (tableDesc.hasPartition()) {
+      partitionDepth = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
+    }
+
+    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
+    if (fs.exists(tablePath)) {
+      getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numResultFragments,
+          new AtomicInteger(0), tableDesc.hasPartition(), 0, partitionDepth);
+    }
+
+    List<Fragment> fragments = new ArrayList<Fragment>();
+
+    String[] previousPartitionPathNames = null;
+    for (FileStatus eachFile: nonZeroLengthFiles) {
+      FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
+
+      if (partitionDepth > 0) {
+        // finding partition key;
+        Path filePath = fileFragment.getPath();
+        Path parentPath = filePath;
+        String[] parentPathNames = new String[partitionDepth];
+        for (int i = 0; i < partitionDepth; i++) {
+          parentPath = parentPath.getParent();
+          parentPathNames[partitionDepth - i - 1] = parentPath.getName();
+        }
+
+        // If current partitionKey == previousPartitionKey, add to result.
+        if (previousPartitionPathNames == null) {
+          fragments.add(fileFragment);
+        } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
+          fragments.add(fileFragment);
+        } else {
+          break;
+        }
+        previousPartitionPathNames = parentPathNames;
+      } else {
+        fragments.add(fileFragment);
+      }
+    }
+
+    return fragments;
+  }
+
+  /**
+   *
+   * @param fs
+   * @param path The table path
+   * @param result The final result files to be used
+   * @param startFileIndex
+   * @param numResultFiles
+   * @param currentFileIndex
+   * @param partitioned A flag to indicate if this table is partitioned
+   * @param currentDepth Current visiting depth of partition directories
+   * @param maxDepth The partition depth of this table
+   * @throws IOException
+   */
+  private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
+                                                int startFileIndex, int numResultFiles,
+                                                AtomicInteger currentFileIndex, boolean partitioned,
+                                                int currentDepth, int maxDepth) throws IOException {
+    // Intermediate directory
+    if (fs.isDirectory(path)) {
+
+      FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter);
+
+      if (files != null && files.length > 0) {
+
+        for (FileStatus eachFile : files) {
+
+          // checking if the enough number of files are found
+          if (result.size() >= numResultFiles) {
+            return;
+          }
+          if (eachFile.isDirectory()) {
+
+            getNonZeroLengthDataFiles(
+                fs,
+                eachFile.getPath(),
+                result,
+                startFileIndex,
+                numResultFiles,
+                currentFileIndex,
+                partitioned,
+                currentDepth + 1, // increment a visiting depth
+                maxDepth);
+
+            // if partitioned table, we should ignore files located in the intermediate directory.
+            // we can ensure that this file is in leaf directory if currentDepth == maxDepth.
+          } else if (eachFile.isFile() && eachFile.getLen() > 0 && (!partitioned || currentDepth == maxDepth)) {
+            if (currentFileIndex.get() >= startFileIndex) {
+              result.add(eachFile);
+            }
+            currentFileIndex.incrementAndGet();
+          }
+        }
+      }
+
+      // Files located in leaf directory
+    } else {
+      FileStatus fileStatus = fs.getFileStatus(path);
+      if (fileStatus != null && fileStatus.getLen() > 0) {
+        if (currentFileIndex.get() >= startFileIndex) {
+          result.add(fileStatus);
+        }
+        currentFileIndex.incrementAndGet();
+        if (result.size() >= numResultFiles) {
+          return;
+        }
+      }
+    }
+  }
+
+  @Override
+  public StorageProperty getStorageProperty() {
+    StorageProperty storageProperty = new StorageProperty();
+    storageProperty.setSortedInsert(false);
+    if (storeType == StoreType.RAW) {
+      storageProperty.setSupportsInsertInto(false);
+    } else {
+      storageProperty.setSupportsInsertInto(true);
+    }
+
+    return storageProperty;
+  }
+
+  @Override
+  public void closeStorageManager() {
+  }
+
+  @Override
+  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
+  }
+
+  @Override
+  public void rollbackOutputCommit(LogicalNode node) throws IOException {
+  }
+
+  @Override
+  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
+                                          Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
+      throws IOException {
+    return null;
+  }
+
+  /**
+   * Returns Scanner instance.
+   *
+   * @param conf The system property
+   * @param meta The table meta
+   * @param schema The input schema
+   * @param path The data file path
+   * @return Scanner instance
+   * @throws java.io.IOException
+   */
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException {
+
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
+
+    return getSeekableScanner(conf, meta, schema, fragment, schema);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
new file mode 100644
index 0000000..33b2750
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HashShuffleAppender implements Appender {
+  private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
+
+  private FileAppender appender;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private int partId;
+
+  private TableStats tableStats;
+
+  //<taskId,<page start offset,<task start, task end>>>
+  private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
+
+  //page start offset, length
+  private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+
+  private Pair<Long, Integer> currentPage;
+
+  private int pageSize; //MB
+
+  private int rowNumInPage;
+
+  private int totalRows;
+
+  private long offset;
+
+  private ExecutionBlockId ebId;
+
+  public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) {
+    this.ebId = ebId;
+    this.partId = partId;
+    this.appender = appender;
+    this.pageSize = pageSize;
+  }
+
+  @Override
+  public void init() throws IOException {
+    currentPage = new Pair(0L, 0);
+    taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
+    rowNumInPage = 0;
+  }
+
+  /**
+   * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
+   * After writing if a current page exceeds pageSize, pageOffset will be added.
+   * @param taskId
+   * @param tuples
+   * @return written bytes
+   * @throws java.io.IOException
+   */
+  public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return 0;
+      }
+      long currentPos = appender.getOffset();
+
+      for (Tuple eachTuple: tuples) {
+        appender.addTuple(eachTuple);
+      }
+      long posAfterWritten = appender.getOffset();
+
+      int writtenBytes = (int)(posAfterWritten - currentPos);
+
+      int nextRowNum = rowNumInPage + tuples.size();
+      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
+      if (taskIndexes == null) {
+        taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+        taskTupleIndexes.put(taskId, taskIndexes);
+      }
+      taskIndexes.add(
+          new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
+      rowNumInPage = nextRowNum;
+
+      if (posAfterWritten - currentPage.getFirst() > pageSize) {
+        nextPage(posAfterWritten);
+        rowNumInPage = 0;
+      }
+
+      totalRows += tuples.size();
+      return writtenBytes;
+    }
+  }
+
+  public long getOffset() throws IOException {
+    if (closed.get()) {
+      return offset;
+    } else {
+      return appender.getOffset();
+    }
+  }
+
+  private void nextPage(long pos) {
+    currentPage.setSecond((int) (pos - currentPage.getFirst()));
+    pages.add(currentPage);
+    currentPage = new Pair(pos, 0);
+  }
+
+  @Override
+  public void addTuple(Tuple t) throws IOException {
+    throw new IOException("Not support addTuple, use addTuples()");
+  }
+
+  @Override
+  public void flush() throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return;
+      }
+      appender.flush();
+    }
+  }
+
+  @Override
+  public long getEstimatedOutputSize() throws IOException {
+    return pageSize * pages.size();
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return;
+      }
+      appender.flush();
+      offset = appender.getOffset();
+      if (offset > currentPage.getFirst()) {
+        nextPage(offset);
+      }
+      appender.close();
+      if (LOG.isDebugEnabled()) {
+        if (!pages.isEmpty()) {
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
+              + ", lastPage=" + pages.get(pages.size() - 1));
+        } else {
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
+        }
+      }
+      closed.set(true);
+      tableStats = appender.getStats();
+    }
+  }
+
+  @Override
+  public void enableStats() {
+  }
+
+  @Override
+  public TableStats getStats() {
+    synchronized(appender) {
+      return appender.getStats();
+    }
+  }
+
+  public List<Pair<Long, Integer>> getPages() {
+    return pages;
+  }
+
+  public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
+    return taskTupleIndexes;
+  }
+
+  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
+    List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
+
+    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
+      merged.addAll(eachFailureIndex);
+    }
+
+    return merged;
+  }
+
+  public void taskFinished(QueryUnitAttemptId taskId) {
+    taskTupleIndexes.remove(taskId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
new file mode 100644
index 0000000..636ae0f
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.util.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HashShuffleAppenderManager {
+  private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
+
+  private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
+      new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
+  private TajoConf systemConf;
+  private FileSystem defaultFS;
+  private FileSystem localFS;
+  private LocalDirAllocator lDirAllocator;
+  private int pageSize;
+
+  public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
+    this.systemConf = systemConf;
+
+    // initialize LocalDirAllocator
+    lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
+
+    // initialize DFS and LocalFileSystems
+    defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
+    localFS = FileSystem.getLocal(systemConf);
+    pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
+  }
+
+  public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
+                              TableMeta meta, Schema outSchema) throws IOException {
+    synchronized (appenderMap) {
+      Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
+
+      if (partitionAppenderMap == null) {
+        partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>();
+        appenderMap.put(ebId, partitionAppenderMap);
+      }
+
+      PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
+      if (partitionAppenderMeta == null) {
+        Path dataFile = getDataFile(ebId, partId);
+        FileSystem fs = dataFile.getFileSystem(systemConf);
+        if (fs.exists(dataFile)) {
+          FileStatus status = fs.getFileStatus(dataFile);
+          LOG.info("File " + dataFile + " already exists, size=" + status.getLen());
+        }
+
+        if (!fs.exists(dataFile.getParent())) {
+          fs.mkdirs(dataFile.getParent());
+        }
+        FileAppender appender = (FileAppender)((FileStorageManager)StorageManager.getFileStorageManager(
+            tajoConf, null)).getAppender(meta, outSchema, dataFile);
+        appender.enableStats();
+        appender.init();
+
+        partitionAppenderMeta = new PartitionAppenderMeta();
+        partitionAppenderMeta.partId = partId;
+        partitionAppenderMeta.dataFile = dataFile;
+        partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
+        partitionAppenderMeta.appender.init();
+        partitionAppenderMap.put(partId, partitionAppenderMeta);
+
+        LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
+      }
+
+      return partitionAppenderMeta.appender;
+    }
+  }
+
+  public static int getPartParentId(int partId, TajoConf tajoConf) {
+    return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
+  }
+
+  private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
+    try {
+      // the base dir for an output dir
+      String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
+      Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf));
+      //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
+
+      // If EB has many partition, too many shuffle file are in single directory.
+      return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId);
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+      throw new IOException(e);
+    }
+  }
+
+  public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
+    Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
+    synchronized (appenderMap) {
+      partitionAppenderMap = appenderMap.remove(ebId);
+    }
+
+    if (partitionAppenderMap == null) {
+      LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
+      return null;
+    }
+
+    // Send Intermediate data to QueryMaster.
+    List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
+    for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
+      try {
+        eachMeta.appender.close();
+        HashShuffleIntermediate intermediate =
+            new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(),
+                eachMeta.appender.getPages(),
+                eachMeta.appender.getMergedTupleIndexes());
+        intermEntries.add(intermediate);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+        throw e;
+      }
+    }
+
+    LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
+
+    return intermEntries;
+  }
+
+  public void finalizeTask(QueryUnitAttemptId taskId) {
+    synchronized (appenderMap) {
+      Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
+        appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
+      if (partitionAppenderMap == null) {
+        return;
+      }
+
+      for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
+        eachAppender.appender.taskFinished(taskId);
+      }
+    }
+  }
+
+  public static class HashShuffleIntermediate {
+    private int partId;
+
+    private long volume;
+
+    //[<page start offset,<task start, task end>>]
+    private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
+
+    //[<page start offset, length>]
+    private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
+
+    public HashShuffleIntermediate(int partId, long volume,
+                                   List<Pair<Long, Integer>> pages,
+                                   Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
+      this.partId = partId;
+      this.volume = volume;
+      this.failureTskTupleIndexes = failureTskTupleIndexes;
+      this.pages = pages;
+    }
+
+    public int getPartId() {
+      return partId;
+    }
+
+    public long getVolume() {
+      return volume;
+    }
+
+    public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
+      return failureTskTupleIndexes;
+    }
+
+    public List<Pair<Long, Integer>> getPages() {
+      return pages;
+    }
+  }
+
+  static class PartitionAppenderMeta {
+    int partId;
+    HashShuffleAppender appender;
+    Path dataFile;
+
+    public int getPartId() {
+      return partId;
+    }
+
+    public HashShuffleAppender getAppender() {
+      return appender;
+    }
+
+    public Path getDataFile() {
+      return dataFile;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java
new file mode 100644
index 0000000..0f31baf
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/LineReader.java
@@ -0,0 +1,559 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+
+/**
+ * A class that provides a line reader from an input stream.
+ * Depending on the constructor used, lines will either be terminated by:
+ * <ul>
+ * <li>one of the following: '\n' (LF) , '\r' (CR),
+ * or '\r\n' (CR+LF).</li>
+ * <li><em>or</em>, a custom byte sequence delimiter</li>
+ * </ul>
+ * In both cases, EOF also terminates an otherwise unterminated
+ * line.
+ */
+
+public class LineReader implements Closeable {
+  private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+  private int bufferSize = DEFAULT_BUFFER_SIZE;
+  private InputStream in;
+  private byte[] buffer;
+  // the number of bytes of real data in the buffer
+  private int bufferLength = 0;
+  // the current position in the buffer
+  private int bufferPosn = 0;
+
+  private static final byte CR = '\r';
+  private static final byte LF = '\n';
+
+  // The line delimiter
+  private final byte[] recordDelimiterBytes;
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size (64k).
+   *
+   * @param in The input stream
+   * @throws java.io.IOException
+   */
+  public LineReader(InputStream in) {
+    this(in, DEFAULT_BUFFER_SIZE);
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size.
+   *
+   * @param in         The input stream
+   * @param bufferSize Size of the read buffer
+   * @throws java.io.IOException
+   */
+  public LineReader(InputStream in, int bufferSize) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = null;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>.
+   *
+   * @param in   input stream
+   * @param conf configuration
+   * @throws java.io.IOException
+   */
+  public LineReader(InputStream in, Configuration conf) throws IOException {
+    this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * default buffer-size, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   The input stream
+   * @param recordDelimiterBytes The delimiter
+   */
+  public LineReader(InputStream in, byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = DEFAULT_BUFFER_SIZE;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * given buffer-size, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   The input stream
+   * @param bufferSize           Size of the read buffer
+   * @param recordDelimiterBytes The delimiter
+   * @throws java.io.IOException
+   */
+  public LineReader(InputStream in, int bufferSize,
+                    byte[] recordDelimiterBytes) {
+    this.in = in;
+    this.bufferSize = bufferSize;
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+  /**
+   * Create a line reader that reads from the given stream using the
+   * <code>io.file.buffer.size</code> specified in the given
+   * <code>Configuration</code>, and using a custom delimiter of array of
+   * bytes.
+   *
+   * @param in                   input stream
+   * @param conf                 configuration
+   * @param recordDelimiterBytes The delimiter
+   * @throws java.io.IOException
+   */
+  public LineReader(InputStream in, Configuration conf,
+                    byte[] recordDelimiterBytes) throws IOException {
+    this.in = in;
+    this.bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+    this.buffer = new byte[this.bufferSize];
+    this.recordDelimiterBytes = recordDelimiterBytes;
+  }
+
+
+  /**
+   * Close the underlying stream.
+   *
+   * @throws java.io.IOException
+   */
+  public void close() throws IOException {
+    in.close();
+  }
+
+  public void reset() {
+    bufferLength = 0;
+    bufferPosn = 0;
+
+  }
+
+  /**
+   * Read one line from the InputStream into the given Text.
+   *
+   * @param str               the object to store the given line (without newline)
+   * @param maxLineLength     the maximum number of bytes to store into str;
+   *                          the rest of the line is silently discarded.
+   * @param maxBytesToConsume the maximum number of bytes to consume
+   *                          in this call.  This is only a hint, because if the line cross
+   *                          this threshold, we allow it to happen.  It can overshoot
+   *                          potentially by as much as one buffer length.
+   * @return the number of bytes read including the (longest) newline
+   *         found.
+   * @throws java.io.IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength,
+                      int maxBytesToConsume) throws IOException {
+    if (this.recordDelimiterBytes != null) {
+      return readCustomLine(str, maxLineLength, maxBytesToConsume);
+    } else {
+      return readDefaultLine(str, maxLineLength, maxBytesToConsume);
+    }
+  }
+
+  protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter)
+      throws IOException {
+    return in.read(buffer);
+  }
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
+    str.clear();
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = fillBuffer(in, buffer, prevCharCR);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+  public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength
+      , int maxBytesToConsume)
+      throws IOException {
+    /* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     */
+
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = fillBuffer(in, buffer, prevCharCR);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        str.write(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+    } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume);
+
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before newline: " + bytesConsumed);
+    }
+
+    if (bytesConsumed > 0) offsets.add(txtLength);
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read a line terminated by one of CR, LF, or CRLF.
+   */
+
+/*  int validIdx = 0;
+  public int readDefaultLines(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, ArrayList<Long> foffsets,
+                             long pos, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+    *//* We're reading data from in, but the head of the stream may be
+     * already buffered in buffer, so we have several cases:
+     * 1. No newline characters are in the buffer, so we need to copy
+     *    everything and read another buffer from the stream.
+     * 2. An unambiguously terminated line is in buffer, so we just
+     *    copy to str.
+     * 3. Ambiguously terminated line is in buffer, i.e. buffer ends
+     *    in CR.  In this case we copy everything up to CR to str, but
+     *    we also need to see what follows CR: if it's LF, then we
+     *    need consume LF as well, so next call to readLine will read
+     *    from after that.
+     * We use a flag prevCharCR to signal if previous character was CR
+     * and, if it happens to be at the end of the buffer, delay
+     * consuming it until we have a chance to look at the char that
+     * follows.
+     *//*
+    //str.clear();
+    str.reset();
+    offsets.clear();
+    foffsets.clear();
+
+    validIdx = 0;
+    long bufferBytesConsumed = 0;
+
+    int txtLength = 0; //tracks str.getLength(), as an optimization
+    int newlineLength = 0; //length of terminating newline
+    boolean prevCharCR = false; //true of prev char was CR
+    long bytesConsumed = 0;
+    do {
+
+      int startPosn = bufferPosn; //starting from where we left off the last time
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        if (prevCharCR) {
+          ++bytesConsumed; //account for CR from previous read
+        }
+        bufferLength = in.read(buffer);
+        if (bufferLength <= 0) {
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline
+        if (buffer[bufferPosn] == LF) {
+          newlineLength = (prevCharCR) ? 2 : 1;
+          ++bufferPosn; // at next invocation proceed from following byte
+          break;
+        }
+        if (prevCharCR) { //CR + notLF, we are at notLF
+          newlineLength = 1;
+          break;
+        }
+        prevCharCR = (buffer[bufferPosn] == CR);
+      }
+      int readLength = bufferPosn - startPosn;
+      if (prevCharCR && newlineLength == 0) {
+        --readLength; //CR at the end of the buffer
+      }
+      bytesConsumed += readLength;
+      int appendLength = readLength - newlineLength;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+
+      if (appendLength > 0) {
+        str.write(buffer, startPosn, appendLength);
+        //System.out.println(startPosn + "," + appendLength);
+        //str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+
+      if(newlineLength > 0){
+        validIdx++;
+
+        if (bytesConsumed > (long)Integer.MAX_VALUE) {
+          throw new IOException("Too many bytes before newline: " + bytesConsumed);
+        }
+        offsets.add(txtLength);
+        foffsets.add(pos);
+        pos+= bytesConsumed;
+        bufferBytesConsumed += bytesConsumed;
+
+        txtLength = 0;
+        newlineLength = 0;
+        prevCharCR = false; //true of prev char was CR
+        bytesConsumed = 0;
+      } else {
+        bufferBytesConsumed += bytesConsumed;
+        bytesConsumed = 0;
+      }
+    } while ((bufferBytesConsumed < 256 * 1024));
+
+    return (int)bufferBytesConsumed;
+  }*/
+
+  /**
+   * Read a line terminated by a custom delimiter.
+   */
+  private int readCustomLine(Text str, int maxLineLength, int maxBytesToConsume)
+      throws IOException {
+   /* We're reading data from inputStream, but the head of the stream may be
+    *  already captured in the previous buffer, so we have several cases:
+    *
+    * 1. The buffer tail does not contain any character sequence which
+    *    matches with the head of delimiter. We count it as a
+    *    ambiguous byte count = 0
+    *
+    * 2. The buffer tail contains a X number of characters,
+    *    that forms a sequence, which matches with the
+    *    head of delimiter. We count ambiguous byte count = X
+    *
+    *    // ***  eg: A segment of input file is as follows
+    *
+    *    " record 1792: I found this bug very interesting and
+    *     I have completely read about it. record 1793: This bug
+    *     can be solved easily record 1794: This ."
+    *
+    *    delimiter = "record";
+    *
+    *    supposing:- String at the end of buffer =
+    *    "I found this bug very interesting and I have completely re"
+    *    There for next buffer = "ad about it. record 179       ...."
+    *
+    *     The matching characters in the input
+    *     buffer tail and delimiter head = "re"
+    *     Therefore, ambiguous byte count = 2 ****   //
+    *
+    *     2.1 If the following bytes are the remaining characters of
+    *         the delimiter, then we have to capture only up to the starting
+    *         position of delimiter. That means, we need not include the
+    *         ambiguous characters in str.
+    *
+    *     2.2 If the following bytes are not the remaining characters of
+    *         the delimiter ( as mentioned in the example ),
+    *         then we have to include the ambiguous characters in str.
+    */
+    str.clear();
+    int txtLength = 0; // tracks str.getLength(), as an optimization
+    long bytesConsumed = 0;
+    int delPosn = 0;
+    int ambiguousByteCount = 0; // To capture the ambiguous characters count
+    do {
+      int startPosn = bufferPosn; // Start from previous end position
+      if (bufferPosn >= bufferLength) {
+        startPosn = bufferPosn = 0;
+        bufferLength = fillBuffer(in, buffer, ambiguousByteCount > 0);
+        if (bufferLength <= 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          break; // EOF
+        }
+      }
+      for (; bufferPosn < bufferLength; ++bufferPosn) {
+        if (buffer[bufferPosn] == recordDelimiterBytes[delPosn]) {
+          delPosn++;
+          if (delPosn >= recordDelimiterBytes.length) {
+            bufferPosn++;
+            break;
+          }
+        } else if (delPosn != 0) {
+          bufferPosn--;
+          delPosn = 0;
+        }
+      }
+      int readLength = bufferPosn - startPosn;
+      bytesConsumed += readLength;
+      int appendLength = readLength - delPosn;
+      if (appendLength > maxLineLength - txtLength) {
+        appendLength = maxLineLength - txtLength;
+      }
+      if (appendLength > 0) {
+        if (ambiguousByteCount > 0) {
+          str.append(recordDelimiterBytes, 0, ambiguousByteCount);
+          //appending the ambiguous characters (refer case 2.2)
+          bytesConsumed += ambiguousByteCount;
+          ambiguousByteCount = 0;
+        }
+        str.append(buffer, startPosn, appendLength);
+        txtLength += appendLength;
+      }
+      if (bufferPosn >= bufferLength) {
+        if (delPosn > 0 && delPosn < recordDelimiterBytes.length) {
+          ambiguousByteCount = delPosn;
+          bytesConsumed -= ambiguousByteCount; //to be consumed in next
+        }
+      }
+    } while (delPosn < recordDelimiterBytes.length
+        && bytesConsumed < maxBytesToConsume);
+    if (bytesConsumed > (long) Integer.MAX_VALUE) {
+      throw new IOException("Too many bytes before delimiter: " + bytesConsumed);
+    }
+    return (int) bytesConsumed;
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   *
+   * @param str           the object to store the given line
+   * @param maxLineLength the maximum number of bytes to store into str.
+   * @return the number of bytes read including the newline
+   * @throws java.io.IOException if the underlying stream throws
+   */
+  public int readLine(Text str, int maxLineLength) throws IOException {
+    return readLine(str, maxLineLength, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Read from the InputStream into the given Text.
+   *
+   * @param str the object to store the given line
+   * @return the number of bytes read including the newline
+   * @throws java.io.IOException if the underlying stream throws
+   */
+  public int readLine(Text str) throws IOException {
+    return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
+  }
+}