You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/12/03 06:30:43 UTC

[28/30] tajo git commit: TAJO-1122: Refactor the tajo-storage project structure.

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
deleted file mode 100644
index 2d68870..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.server.common.Util;
-
-import java.io.*;
-import java.net.URI;
-import java.util.*;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-
-public class DiskUtil {
-
-  static String UNIX_DISK_DEVICE_PATH = "/proc/partitions";
-
-  public enum OSType {
-		OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
-	}
-
-	static private OSType getOSType() {
-		String osName = System.getProperty("os.name");
-		if (osName.contains("Windows")
-				&& (osName.contains("XP") || osName.contains("2003")
-						|| osName.contains("Vista")
-						|| osName.contains("Windows_7")
-						|| osName.contains("Windows 7") || osName
-							.contains("Windows7"))) {
-			return OSType.OS_TYPE_WINXP;
-		} else if (osName.contains("SunOS") || osName.contains("Solaris")) {
-			return OSType.OS_TYPE_SOLARIS;
-		} else if (osName.contains("Mac")) {
-			return OSType.OS_TYPE_MAC;
-		} else {
-			return OSType.OS_TYPE_UNIX;
-		}
-	}
-	
-	public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
-		List<DiskDeviceInfo> deviceInfos;
-		
-		if(getOSType() == OSType.OS_TYPE_UNIX) {
-			deviceInfos = getUnixDiskDeviceInfos();
-			setDeviceMountInfo(deviceInfos);
-		} else {
-			deviceInfos = getDefaultDiskDeviceInfos();
-		}
-		
-		return deviceInfos;
-	}
-
-	private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
-		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-		
-		File file = new File(UNIX_DISK_DEVICE_PATH);
-		if(!file.exists()) {
-			System.out.println("No partition file:" + file.getAbsolutePath());
-			return getDefaultDiskDeviceInfos();
-		}
-		
-		BufferedReader reader = null;
-		try {
-			reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH)));
-			String line = null;
-			
-			int count = 0;
-			Set<String> deviceNames = new TreeSet<String>();
-			while((line = reader.readLine()) != null) {
-				if(count > 0 && !line.trim().isEmpty()) {
-					String[] tokens = line.trim().split(" +");
-					if(tokens.length == 4) {
-						String deviceName = getDiskDeviceName(tokens[3]);
-						deviceNames.add(deviceName);
-					}
-				}
-				count++;
-			}
-			
-			int id = 0;
-			for(String eachDeviceName: deviceNames) {
-				DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
-				diskDeviceInfo.setName(eachDeviceName);
-				
-				//TODO set addtional info
-				// /sys/block/sda/queue
-				infos.add(diskDeviceInfo);
-			}
-		} catch (Exception e) {
-			e.printStackTrace();
-		} finally {
-			if(reader != null) {
-				try {
-					reader.close();
-				} catch (IOException e) {
-				}
-			}
-		}
-		
-		return infos;
-	}
-	
-	private static String getDiskDeviceName(String partitionName) {
-		byte[] bytes = partitionName.getBytes();
-		
-		byte[] result = new byte[bytes.length];
-		int length = 0;
-		for(int i = 0; i < bytes.length; i++, length++) {
-			if(bytes[i] >= '0' && bytes[i] <= '9') {
-				break;
-			} else {
-				result[i] = bytes[i];
-			}
-		}
-		
-		return new String(result, 0, length);
-	}
-	
-	public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
-		DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
-		diskDeviceInfo.setName("default");
-		
-		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
-		
-		infos.add(diskDeviceInfo);
-		
-		return infos;
-	}
-	
-	
-	private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
-		Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
-		for(DiskDeviceInfo eachDevice: deviceInfos) {
-			deviceMap.put(eachDevice.getName(), eachDevice);
-		}
-		
-		BufferedReader mountOutput = null;
-		try {
-			Process mountProcess = Runtime.getRuntime().exec("mount");
-			mountOutput = new BufferedReader(new InputStreamReader(
-					mountProcess.getInputStream()));
-			while (true) {
-				String line = mountOutput.readLine();
-				if (line == null) {
-					break;
-				}
-
-				int indexStart = line.indexOf(" on /");
-				int indexEnd = line.indexOf(" ", indexStart + 4);
-
-				String deviceName = line.substring(0, indexStart).trim();
-				String[] deviceNameTokens = deviceName.split("/");
-				if(deviceNameTokens.length == 3) {
-					if("dev".equals(deviceNameTokens[1])) {
-						String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
-						String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
-						
-						DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
-						if(diskDeviceInfo != null) {
-							diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
-						}
-					}
-				}
-			}
-		} catch (IOException e) {
-			throw e;
-		} finally {
-			if (mountOutput != null) {
-				mountOutput.close();
-			}
-		}
-	}
-
-  public static int getDataNodeStorageSize(){
-    return getStorageDirs().size();
-  }
-
-  public static List<URI> getStorageDirs(){
-    Configuration conf = new HdfsConfiguration();
-    Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
-    return Util.stringCollectionAsURIs(dirNames);
-  }
-
-	public static void main(String[] args) throws Exception {
-		System.out.println("/dev/sde1".split("/").length);
-		for(String eachToken: "/dev/sde1".split("/")) {
-			System.out.println(eachToken);
-		}
- 	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
deleted file mode 100644
index 7df4584..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import io.netty.buffer.ByteBuf;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.datum.Datum;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-
-public interface FieldSerializerDeserializer {
-
-  public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException;
-
-  public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
deleted file mode 100644
index 117d3da..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-
-import java.io.IOException;
-
-public abstract class FileAppender implements Appender {
-  private static final Log LOG = LogFactory.getLog(FileAppender.class);
-
-  protected boolean inited = false;
-
-  protected final Configuration conf;
-  protected final TableMeta meta;
-  protected final Schema schema;
-  protected final Path workDir;
-  protected final QueryUnitAttemptId taskAttemptId;
-
-  protected boolean enabledStats;
-  protected Path path;
-
-  public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema,
-                      TableMeta meta, Path workDir) {
-    this.conf = conf;
-    this.meta = meta;
-    this.schema = schema;
-    this.workDir = workDir;
-    this.taskAttemptId = taskAttemptId;
-
-    try {
-      if (taskAttemptId != null) {
-        this.path = StorageManager.getFileStorageManager((TajoConf) conf).getAppenderFilePath(taskAttemptId, workDir);
-      } else {
-        this.path = workDir;
-      }
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  public void init() throws IOException {
-    if (inited) {
-     throw new IllegalStateException("FileAppender is already initialized.");
-    }
-    inited = true;
-  }
-
-  public void enableStats() {
-    if (inited) {
-      throw new IllegalStateException("Should enable this option before init()");
-    }
-
-    this.enabledStats = true;
-  }
-
-  public long getEstimatedOutputSize() throws IOException {
-    return getOffset();
-  }
-
-  public abstract long getOffset() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
deleted file mode 100644
index 038f0f4..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.Column;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.statistics.ColumnStats;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-
-public abstract class FileScanner implements Scanner {
-  private static final Log LOG = LogFactory.getLog(FileScanner.class);
-
-  protected boolean inited = false;
-  protected final Configuration conf;
-  protected final TableMeta meta;
-  protected final Schema schema;
-  protected final FileFragment fragment;
-  protected final int columnNum;
-
-  protected Column [] targets;
-
-  protected float progress;
-
-  protected TableStats tableStats;
-
-  public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) {
-    this.conf = conf;
-    this.meta = meta;
-    this.schema = schema;
-    this.fragment = (FileFragment)fragment;
-    this.tableStats = new TableStats();
-    this.columnNum = this.schema.size();
-  }
-
-  public void init() throws IOException {
-    inited = true;
-    progress = 0.0f;
-
-    if (fragment != null) {
-      tableStats.setNumBytes(fragment.getLength());
-      tableStats.setNumBlocks(1);
-    }
-
-    if (schema != null) {
-      for(Column eachColumn: schema.getColumns()) {
-        ColumnStats columnStats = new ColumnStats(eachColumn);
-        tableStats.addColumnStat(columnStats);
-      }
-    }
-  }
-
-  @Override
-  public Schema getSchema() {
-    return schema;
-  }
-
-  @Override
-  public void setTarget(Column[] targets) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-    this.targets = targets;
-  }
-
-  public void setSearchCondition(Object expr) {
-    if (inited) {
-      throw new IllegalStateException("Should be called before init()");
-    }
-  }
-
-  public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException {
-    String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME);
-    FileSystem fs;
-    if(tajoUser != null) {
-      try {
-        fs = FileSystem.get(path.toUri(), tajoConf, tajoUser);
-      } catch (InterruptedException e) {
-        LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]");
-        fs = FileSystem.get(path.toUri(), tajoConf);
-      }
-    } else {
-      fs = FileSystem.get(path.toUri(), tajoConf);
-    }
-
-    return fs;
-  }
-
-  @Override
-  public float getProgress() {
-    return progress;
-  }
-
-  @Override
-  public TableStats getInputStats() {
-    return tableStats;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
deleted file mode 100644
index 442ed5e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java
+++ /dev/null
@@ -1,832 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.*;
-import org.apache.tajo.catalog.*;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.plan.logical.ScanNode;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-import org.apache.tajo.util.*;
-
-import java.io.IOException;
-import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class FileStorageManager extends StorageManager {
-  private final Log LOG = LogFactory.getLog(FileStorageManager.class);
-
-  static final String OUTPUT_FILE_PREFIX="part-";
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(2);
-          return fmt;
-        }
-      };
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(6);
-          return fmt;
-        }
-      };
-
-  static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ =
-      new ThreadLocal<NumberFormat>() {
-        @Override
-        public NumberFormat initialValue() {
-          NumberFormat fmt = NumberFormat.getInstance();
-          fmt.setGroupingUsed(false);
-          fmt.setMinimumIntegerDigits(3);
-          return fmt;
-        }
-      };
-
-  protected FileSystem fs;
-  protected Path tableBaseDir;
-  protected boolean blocksMetadataEnabled;
-  private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0));
-
-  public FileStorageManager(StoreType storeType) {
-    super(storeType);
-  }
-
-  @Override
-  protected void storageInit() throws IOException {
-    this.tableBaseDir = TajoConf.getWarehouseDir(conf);
-    this.fs = tableBaseDir.getFileSystem(conf);
-    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-    if (!this.blocksMetadataEnabled)
-      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
-  }
-
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path)
-      throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    return getFileScanner(meta, schema, path, status);
-  }
-
-  public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status)
-      throws IOException {
-    Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen());
-    return getScanner(meta, schema, fragment);
-  }
-
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
-  public Path getWarehouseDir() {
-    return this.tableBaseDir;
-  }
-
-  public void delete(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    fs.delete(tablePath, true);
-  }
-
-  public boolean exists(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    return fileSystem.exists(path);
-  }
-
-  /**
-   * This method deletes only data contained in the given path.
-   *
-   * @param path The path in which data are deleted.
-   * @throws IOException
-   */
-  public void deleteData(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    FileStatus[] fileLists = fileSystem.listStatus(path);
-    for (FileStatus status : fileLists) {
-      fileSystem.delete(status.getPath(), true);
-    }
-  }
-
-  public Path getTablePath(String tableName) {
-    return new Path(tableBaseDir, tableName);
-  }
-
-  @VisibleForTesting
-  public Appender getAppender(TableMeta meta, Schema schema, Path filePath)
-      throws IOException {
-    return getAppender(null, null, meta, schema, filePath);
-  }
-
-  public FileFragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fragmentSize);
-  }
-
-  public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen());
-      listTablets.add(tablet);
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public FileFragment[] split(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
-  }
-
-  public FileFragment[] split(String tableName, Path tablePath) throws IOException {
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  private FileFragment[] split(String tableName, Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
-                                       Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<FileFragment> listTablets = new ArrayList<FileFragment>();
-    FileFragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize));
-      } else {
-        listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize));
-      }
-    }
-
-    FileFragment[] tablets = new FileFragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public long calculateSize(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    long totalSize = 0;
-
-    if (fs.exists(tablePath)) {
-      totalSize = fs.getContentSummary(tablePath).getLength();
-    }
-
-    return totalSize;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // FileInputFormat Area
-  /////////////////////////////////////////////////////////////////////////////
-
-  public static final PathFilter hiddenFileFilter = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) {
-    if (taskAttemptId == null) {
-      // For testcase
-      return workDir;
-    }
-    // The final result of a task will be written in a file named part-ss-nnnnnnn,
-    // where ss is the subquery id associated with this task, and nnnnnn is the task id.
-    Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME,
-        OUTPUT_FILE_PREFIX +
-            OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" +
-            OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" +
-            OUTPUT_FILE_FORMAT_SEQ.get().format(0));
-    LOG.info("Output File Path: " + outFilePath);
-
-    return outFilePath;
-  }
-
-  /**
-   * Proxy PathFilter that accepts a path only if all filters given in the
-   * constructor do. Used by the listPaths() to apply the built-in
-   * hiddenFileFilter together with a user provided one (if any).
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (!filter.accept(path)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * List input directories.
-   * Subclasses may override to, e.g., select only files matching a regular
-   * expression.
-   *
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listStatus(Path... dirs) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-
-    List<IOException> errors = new ArrayList<IOException>();
-
-    // creates a MultiPathFilter with the hiddenFileFilter and the
-    // user provided one (if any).
-    List<PathFilter> filters = new ArrayList<PathFilter>();
-    filters.add(hiddenFileFilter);
-
-    PathFilter inputFilter = new MultiPathFilter(filters);
-
-    for (int i = 0; i < dirs.length; ++i) {
-      Path p = dirs[i];
-
-      FileSystem fs = p.getFileSystem(conf);
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
-      if (matches == null) {
-        errors.add(new IOException("Input path does not exist: " + p));
-      } else if (matches.length == 0) {
-        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
-      } else {
-        for (FileStatus globStat : matches) {
-          if (globStat.isDirectory()) {
-            for (FileStatus stat : fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              result.add(stat);
-            }
-          } else {
-            result.add(globStat);
-          }
-        }
-      }
-    }
-
-    if (!errors.isEmpty()) {
-      throw new InvalidInputException(errors);
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  /**
-   * Is the given filename splitable? Usually, true, but if the file is
-   * stream compressed, it will not be.
-   * <p/>
-   * <code>FileInputFormat</code> implementations can override this and return
-   * <code>false</code> to ensure that individual input files are never split-up
-   * so that Mappers process entire files.
-   *
-   *
-   * @param path the file name to check
-   * @param status get the file length
-   * @return is this file isSplittable?
-   */
-  protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException {
-    Scanner scanner = getFileScanner(meta, schema, path, status);
-    boolean split = scanner.isSplittable();
-    scanner.close();
-    return split;
-  }
-
-  private static final double SPLIT_SLOP = 1.1;   // 10% slop
-
-  protected int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-    for (int i = 0; i < blkLocations.length; i++) {
-      // is the offset inside this block?
-      if ((blkLocations[i].getOffset() <= offset) &&
-          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
-        return i;
-      }
-    }
-    BlockLocation last = blkLocations[blkLocations.length - 1];
-    long fileLength = last.getOffset() + last.getLength() - 1;
-    throw new IllegalArgumentException("Offset " + offset +
-        " is outside of file (0.." +
-        fileLength + ")");
-  }
-
-  /**
-   * A factory that makes the split for this class. It can be overridden
-   * by sub-classes to make sub-types
-   */
-  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) {
-    return new FileFragment(fragmentId, file, start, length);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, Path file, long start, long length,
-                                   String[] hosts) {
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation)
-      throws IOException {
-    return new FileFragment(fragmentId, file, blockLocation);
-  }
-
-  // for Non Splittable. eg, compressed gzip TextFile
-  protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length,
-                                      BlockLocation[] blkLocations) throws IOException {
-
-    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
-    for (BlockLocation blockLocation : blkLocations) {
-      for (String host : blockLocation.getHosts()) {
-        if (hostsBlockMap.containsKey(host)) {
-          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
-        } else {
-          hostsBlockMap.put(host, 1);
-        }
-      }
-    }
-
-    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
-    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
-      @Override
-      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
-        return v1.getValue().compareTo(v2.getValue());
-      }
-    });
-
-    String[] hosts = new String[blkLocations[0].getHosts().length];
-
-    for (int i = 0; i < hosts.length; i++) {
-      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
-      hosts[i] = entry.getKey();
-    }
-    return new FileFragment(fragmentId, file, start, length, hosts);
-  }
-
-  /**
-   * Get the minimum split size
-   *
-   * @return the minimum number of bytes that can be in a split
-   */
-  public long getMinSplitSize() {
-    return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE);
-  }
-
-  /**
-   * Get Disk Ids by Volume Bytes
-   */
-  private int[] getDiskIds(VolumeId[] volumeIds) {
-    int[] diskIds = new int[volumeIds.length];
-    for (int i = 0; i < volumeIds.length; i++) {
-      int diskId = -1;
-      if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) {
-        diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode();
-      }
-      diskIds[i] = diskId;
-    }
-    return diskIds;
-  }
-
-  /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (FileFragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
-   * Generate the list of files and make them into FileSplits.
-   *
-   * @throws IOException
-   */
-  public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs)
-      throws IOException {
-    // generate splits'
-
-    List<Fragment> splits = Lists.newArrayList();
-    List<Fragment> volumeSplits = Lists.newArrayList();
-    List<BlockLocation> blockLocations = Lists.newArrayList();
-
-    for (Path p : inputs) {
-      FileSystem fs = p.getFileSystem(conf);
-      ArrayList<FileStatus> files = Lists.newArrayList();
-      if (fs.isFile(p)) {
-        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
-      } else {
-        files.addAll(listStatus(p));
-      }
-
-      int previousSplitSize = splits.size();
-      for (FileStatus file : files) {
-        Path path = file.getPath();
-        long length = file.getLen();
-        if (length > 0) {
-          // Get locations of blocks of file
-          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-          boolean splittable = isSplittable(meta, schema, path, file);
-          if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-
-            if (splittable) {
-              for (BlockLocation blockLocation : blkLocations) {
-                volumeSplits.add(makeSplit(tableName, path, blockLocation));
-              }
-              blockLocations.addAll(Arrays.asList(blkLocations));
-
-            } else { // Non splittable
-              long blockSize = blkLocations[0].getLength();
-              if (blockSize >= length) {
-                blockLocations.addAll(Arrays.asList(blkLocations));
-                for (BlockLocation blockLocation : blkLocations) {
-                  volumeSplits.add(makeSplit(tableName, path, blockLocation));
-                }
-              } else {
-                splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
-              }
-            }
-
-          } else {
-            if (splittable) {
-
-              long minSize = Math.max(getMinSplitSize(), 1);
-
-              long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one
-              long splitSize = Math.max(minSize, blockSize);
-              long bytesRemaining = length;
-
-              // for s3
-              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
-                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize,
-                    blkLocations[blkIndex].getHosts()));
-                bytesRemaining -= splitSize;
-              }
-              if (bytesRemaining > 0) {
-                int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
-                splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining,
-                    blkLocations[blkIndex].getHosts()));
-              }
-            } else { // Non splittable
-              splits.add(makeNonSplit(tableName, path, 0, length, blkLocations));
-            }
-          }
-        } else {
-          //for zero length files
-          splits.add(makeSplit(tableName, path, 0, length));
-        }
-      }
-      if(LOG.isDebugEnabled()){
-        LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize));
-      }
-    }
-
-    // Combine original fileFragments with new VolumeId information
-    setVolumeMeta(volumeSplits, blockLocations);
-    splits.addAll(volumeSplits);
-    LOG.info("Total # of splits: " + splits.size());
-    return splits;
-  }
-
-  private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations)
-      throws IOException {
-
-    int locationSize = blockLocations.size();
-    int splitSize = splits.size();
-    if (locationSize == 0 || splitSize == 0) return;
-
-    if (locationSize != splitSize) {
-      // splits and locations don't match up
-      LOG.warn("Number of block locations not equal to number of splits: "
-          + "#locations=" + locationSize
-          + " #splits=" + splitSize);
-      return;
-    }
-
-    DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf);
-    int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
-    int blockLocationIdx = 0;
-
-    Iterator<Fragment> iter = splits.iterator();
-    while (locationSize > blockLocationIdx) {
-
-      int subSize = Math.min(locationSize - blockLocationIdx, lsLimit);
-      List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize);
-      //BlockStorageLocation containing additional volume location information for each replica of each block.
-      BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations);
-
-      for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-        ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds()));
-        blockLocationIdx++;
-      }
-    }
-    LOG.info("# of splits with volumeId " + splitSize);
-  }
-
-  private static class InvalidInputException extends IOException {
-    List<IOException> errors;
-    public InvalidInputException(List<IOException> errors) {
-      this.errors = errors;
-    }
-
-    @Override
-    public String getMessage(){
-      StringBuffer sb = new StringBuffer();
-      int messageLimit = Math.min(errors.size(), 10);
-      for (int i = 0; i < messageLimit ; i ++) {
-        sb.append(errors.get(i).getMessage()).append("\n");
-      }
-
-      if(messageLimit < errors.size())
-        sb.append("skipped .....").append("\n");
-
-      return sb.toString();
-    }
-  }
-
-  @Override
-  public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException {
-    return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath()));
-  }
-
-  @Override
-  public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException {
-    if (!tableDesc.isExternal()) {
-      String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName());
-      String databaseName = splitted[0];
-      String simpleTableName = splitted[1];
-
-      // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} )
-      Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName);
-      tableDesc.setPath(tablePath.toUri());
-    } else {
-      Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given.");
-    }
-
-    Path path = new Path(tableDesc.getPath());
-
-    FileSystem fs = path.getFileSystem(conf);
-    TableStats stats = new TableStats();
-    if (tableDesc.isExternal()) {
-      if (!fs.exists(path)) {
-        LOG.error(path.toUri() + " does not exist");
-        throw new IOException("ERROR: " + path.toUri() + " does not exist");
-      }
-    } else {
-      fs.mkdirs(path);
-    }
-
-    long totalSize = 0;
-
-    try {
-      totalSize = calculateSize(path);
-    } catch (IOException e) {
-      LOG.warn("Cannot calculate the size of the relation", e);
-    }
-
-    stats.setNumBytes(totalSize);
-
-    if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing.
-      stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER);
-    }
-
-    tableDesc.setStats(stats);
-  }
-
-  @Override
-  public void purgeTable(TableDesc tableDesc) throws IOException {
-    try {
-      Path path = new Path(tableDesc.getPath());
-      FileSystem fs = path.getFileSystem(conf);
-      LOG.info("Delete table data dir: " + path);
-      fs.delete(path, true);
-    } catch (IOException e) {
-      throw new InternalError(e.getMessage());
-    }
-  }
-
-  @Override
-  public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException {
-    // Listing table data file which is not empty.
-    // If the table is a partitioned table, return file list which has same partition key.
-    Path tablePath = new Path(tableDesc.getPath());
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
-    if (fs.exists(tablePath)) {
-      getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numFragments,
-          new AtomicInteger(0));
-    }
-
-    List<Fragment> fragments = new ArrayList<Fragment>();
-
-    //In the case of partitioned table, return same partition key data files.
-    int numPartitionColumns = 0;
-    if (tableDesc.hasPartition()) {
-      numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
-    }
-    String[] previousPartitionPathNames = null;
-    for (FileStatus eachFile: nonZeroLengthFiles) {
-      FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
-
-      if (numPartitionColumns > 0) {
-        // finding partition key;
-        Path filePath = fileFragment.getPath();
-        Path parentPath = filePath;
-        String[] parentPathNames = new String[numPartitionColumns];
-        for (int i = 0; i < numPartitionColumns; i++) {
-          parentPath = parentPath.getParent();
-          parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
-        }
-
-        // If current partitionKey == previousPartitionKey, add to result.
-        if (previousPartitionPathNames == null) {
-          fragments.add(fileFragment);
-        } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
-          fragments.add(fileFragment);
-        } else {
-          break;
-        }
-        previousPartitionPathNames = parentPathNames;
-      } else {
-        fragments.add(fileFragment);
-      }
-    }
-
-    return fragments;
-  }
-
-  private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
-                                                int startFileIndex, int numResultFiles,
-                                                AtomicInteger currentFileIndex) throws IOException {
-    if (fs.isDirectory(path)) {
-      FileStatus[] files = fs.listStatus(path, FileStorageManager.hiddenFileFilter);
-      if (files != null && files.length > 0) {
-        for (FileStatus eachFile : files) {
-          if (result.size() >= numResultFiles) {
-            return;
-          }
-          if (eachFile.isDirectory()) {
-            getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
-                currentFileIndex);
-          } else if (eachFile.isFile() && eachFile.getLen() > 0) {
-            if (currentFileIndex.get() >= startFileIndex) {
-              result.add(eachFile);
-            }
-            currentFileIndex.incrementAndGet();
-          }
-        }
-      }
-    } else {
-      FileStatus fileStatus = fs.getFileStatus(path);
-      if (fileStatus != null && fileStatus.getLen() > 0) {
-        if (currentFileIndex.get() >= startFileIndex) {
-          result.add(fileStatus);
-        }
-        currentFileIndex.incrementAndGet();
-        if (result.size() >= numResultFiles) {
-          return;
-        }
-      }
-    }
-  }
-
-  @Override
-  public StorageProperty getStorageProperty() {
-    StorageProperty storageProperty = new StorageProperty();
-    storageProperty.setSortedInsert(false);
-    if (storeType == StoreType.RAW) {
-      storageProperty.setSupportsInsertInto(false);
-    } else {
-      storageProperty.setSupportsInsertInto(true);
-    }
-
-    return storageProperty;
-  }
-
-  @Override
-  public void closeStorageManager() {
-  }
-
-  @Override
-  public void beforeInsertOrCATS(LogicalNode node) throws IOException {
-  }
-
-  @Override
-  public void rollbackOutputCommit(LogicalNode node) throws IOException {
-  }
-
-  @Override
-  public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc,
-                                          Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange)
-      throws IOException {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
deleted file mode 100644
index 8b7e2e0..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * 
- */
-package org.apache.tajo.storage;
-
-import com.google.common.base.Preconditions;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-
-/**
- * An instance of FrameTuple is an immutable tuple.
- * It contains two tuples and pretends to be one instance of Tuple for
- * join qual evaluatations.
- */
-public class FrameTuple implements Tuple, Cloneable {
-  private int size;
-  private int leftSize;
-  
-  private Tuple left;
-  private Tuple right;
-  
-  public FrameTuple() {}
-  
-  public FrameTuple(Tuple left, Tuple right) {
-    set(left, right);
-  }
-  
-  public void set(Tuple left, Tuple right) {
-    this.size = left.size() + right.size();
-    this.left = left;
-    this.leftSize = left.size();
-    this.right = right;
-  }
-
-  @Override
-  public int size() {
-    return size;
-  }
-
-  @Override
-  public boolean contains(int fieldId) {
-    Preconditions.checkArgument(fieldId < size, 
-        "Out of field access: " + fieldId);
-    
-    if (fieldId < leftSize) {
-      return left.contains(fieldId);
-    } else {
-      return right.contains(fieldId - leftSize);
-    }
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return get(fieldid).isNull();
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return !isNull(fieldid);
-  }
-
-  @Override
-  public void clear() {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(int fieldId, Datum value) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void setOffset(long offset) {
-    throw new UnsupportedException();
-  }
-  
-  @Override
-  public long getOffset() {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public void put(Datum [] values) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public Datum get(int fieldId) {
-    Preconditions.checkArgument(fieldId < size, 
-        "Out of field access: " + fieldId);
-    
-    if (fieldId < leftSize) {
-      return left.get(fieldId);
-    } else {
-      return right.get(fieldId - leftSize);
-    }
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return get(fieldId).asBool();
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return get(fieldId).asByte();
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return get(fieldId).asChar();
-  }
-
-  @Override
-  public byte [] getBytes(int fieldId) {
-    return get(fieldId).asByteArray();
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    return get(fieldId).asInt2();
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return get(fieldId).asInt4();
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return get(fieldId).asInt8();
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return get(fieldId).asFloat4();
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return get(fieldId).asFloat8();
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return get(fieldId).asChars();
-  }
-
-  @Override
-  public ProtobufDatum getProtobufDatum(int fieldId) {
-    return (ProtobufDatum) get(fieldId);
-  }
-
-  @Override
-  public IntervalDatum getInterval(int fieldId) {
-    return (IntervalDatum) get(fieldId);
-  }
-
-  @Override
-  public char [] getUnicodeChars(int fieldId) {
-    return get(fieldId).asUnicodeChars();
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    FrameTuple frameTuple = (FrameTuple) super.clone();
-    frameTuple.set(this.left.clone(), this.right.clone());
-    return frameTuple;
-  }
-
-  @Override
-  public Datum[] getValues(){
-    throw new UnsupportedException();
-  }
-
-  public String toString() {
-    boolean first = true;
-    StringBuilder str = new StringBuilder();
-    str.append("(");
-    for(int i=0; i < size(); i++) {      
-      if(contains(i)) {
-        if(first) {
-          first = false;
-        } else {
-          str.append(", ");
-        }
-        str.append(i)
-        .append("=>")
-        .append(get(i));
-      }
-    }
-    str.append(")");
-    return str.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
deleted file mode 100644
index 40cad32..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class HashShuffleAppender implements Appender {
-  private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
-
-  private FileAppender appender;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private int partId;
-
-  private TableStats tableStats;
-
-  //<taskId,<page start offset,<task start, task end>>>
-  private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
-
-  //page start offset, length
-  private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-
-  private Pair<Long, Integer> currentPage;
-
-  private int pageSize; //MB
-
-  private int rowNumInPage;
-
-  private int totalRows;
-
-  private long offset;
-
-  private ExecutionBlockId ebId;
-
-  public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) {
-    this.ebId = ebId;
-    this.partId = partId;
-    this.appender = appender;
-    this.pageSize = pageSize;
-  }
-
-  @Override
-  public void init() throws IOException {
-    currentPage = new Pair(0L, 0);
-    taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>();
-    rowNumInPage = 0;
-  }
-
-  /**
-   * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
-   * After writing if a current page exceeds pageSize, pageOffset will be added.
-   * @param taskId
-   * @param tuples
-   * @return written bytes
-   * @throws IOException
-   */
-  public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return 0;
-      }
-      long currentPos = appender.getOffset();
-
-      for (Tuple eachTuple: tuples) {
-        appender.addTuple(eachTuple);
-      }
-      long posAfterWritten = appender.getOffset();
-
-      int writtenBytes = (int)(posAfterWritten - currentPos);
-
-      int nextRowNum = rowNumInPage + tuples.size();
-      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
-      if (taskIndexes == null) {
-        taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-        taskTupleIndexes.put(taskId, taskIndexes);
-      }
-      taskIndexes.add(
-          new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
-      rowNumInPage = nextRowNum;
-
-      if (posAfterWritten - currentPage.getFirst() > pageSize) {
-        nextPage(posAfterWritten);
-        rowNumInPage = 0;
-      }
-
-      totalRows += tuples.size();
-      return writtenBytes;
-    }
-  }
-
-  public long getOffset() throws IOException {
-    if (closed.get()) {
-      return offset;
-    } else {
-      return appender.getOffset();
-    }
-  }
-
-  private void nextPage(long pos) {
-    currentPage.setSecond((int) (pos - currentPage.getFirst()));
-    pages.add(currentPage);
-    currentPage = new Pair(pos, 0);
-  }
-
-  @Override
-  public void addTuple(Tuple t) throws IOException {
-    throw new IOException("Not support addTuple, use addTuples()");
-  }
-
-  @Override
-  public void flush() throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return;
-      }
-      appender.flush();
-    }
-  }
-
-  @Override
-  public long getEstimatedOutputSize() throws IOException {
-    return pageSize * pages.size();
-  }
-
-  @Override
-  public void close() throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return;
-      }
-      appender.flush();
-      offset = appender.getOffset();
-      if (offset > currentPage.getFirst()) {
-        nextPage(offset);
-      }
-      appender.close();
-      if (LOG.isDebugEnabled()) {
-        if (!pages.isEmpty()) {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
-              + ", lastPage=" + pages.get(pages.size() - 1));
-        } else {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
-        }
-      }
-      closed.set(true);
-      tableStats = appender.getStats();
-    }
-  }
-
-  @Override
-  public void enableStats() {
-  }
-
-  @Override
-  public TableStats getStats() {
-    synchronized(appender) {
-      return appender.getStats();
-    }
-  }
-
-  public List<Pair<Long, Integer>> getPages() {
-    return pages;
-  }
-
-  public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
-    return taskTupleIndexes;
-  }
-
-  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
-    List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>();
-
-    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
-      merged.addAll(eachFailureIndex);
-    }
-
-    return merged;
-  }
-
-  public void taskFinished(QueryUnitAttemptId taskId) {
-    taskTupleIndexes.remove(taskId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
deleted file mode 100644
index 33a9233..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class HashShuffleAppenderManager {
-  private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class);
-
-  private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap =
-      new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>();
-  private TajoConf systemConf;
-  private FileSystem defaultFS;
-  private FileSystem localFS;
-  private LocalDirAllocator lDirAllocator;
-  private int pageSize;
-
-  public HashShuffleAppenderManager(TajoConf systemConf) throws IOException {
-    this.systemConf = systemConf;
-
-    // initialize LocalDirAllocator
-    lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
-
-    // initialize DFS and LocalFileSystems
-    defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf);
-    localFS = FileSystem.getLocal(systemConf);
-    pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
-  }
-
-  public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
-                              TableMeta meta, Schema outSchema) throws IOException {
-    synchronized (appenderMap) {
-      Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
-
-      if (partitionAppenderMap == null) {
-        partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>();
-        appenderMap.put(ebId, partitionAppenderMap);
-      }
-
-      PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId);
-      if (partitionAppenderMeta == null) {
-        Path dataFile = getDataFile(ebId, partId);
-        FileSystem fs = dataFile.getFileSystem(systemConf);
-        if (fs.exists(dataFile)) {
-          FileStatus status = fs.getFileStatus(dataFile);
-          LOG.info("File " + dataFile + " already exists, size=" + status.getLen());
-        }
-
-        if (!fs.exists(dataFile.getParent())) {
-          fs.mkdirs(dataFile.getParent());
-        }
-        FileAppender appender = (FileAppender) StorageManager.getFileStorageManager(
-            tajoConf, null).getAppender(meta, outSchema, dataFile);
-        appender.enableStats();
-        appender.init();
-
-        partitionAppenderMeta = new PartitionAppenderMeta();
-        partitionAppenderMeta.partId = partId;
-        partitionAppenderMeta.dataFile = dataFile;
-        partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
-        partitionAppenderMeta.appender.init();
-        partitionAppenderMap.put(partId, partitionAppenderMeta);
-
-        LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile);
-      }
-
-      return partitionAppenderMeta.appender;
-    }
-  }
-
-  public static int getPartParentId(int partId, TajoConf tajoConf) {
-    return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS);
-  }
-
-  private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException {
-    try {
-      // the base dir for an output dir
-      String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle";
-      Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf));
-      //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")");
-
-      // If EB has many partition, too many shuffle file are in single directory.
-      return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-      throw new IOException(e);
-    }
-  }
-
-  public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException {
-    Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null;
-    synchronized (appenderMap) {
-      partitionAppenderMap = appenderMap.remove(ebId);
-    }
-
-    if (partitionAppenderMap == null) {
-      LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
-      return null;
-    }
-
-    // Send Intermediate data to QueryMaster.
-    List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>();
-    for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) {
-      try {
-        eachMeta.appender.close();
-        HashShuffleIntermediate intermediate =
-            new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(),
-                eachMeta.appender.getPages(),
-                eachMeta.appender.getMergedTupleIndexes());
-        intermEntries.add(intermediate);
-      } catch (IOException e) {
-        LOG.error(e.getMessage(), e);
-        throw e;
-      }
-    }
-
-    LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size());
-
-    return intermEntries;
-  }
-
-  public void finalizeTask(QueryUnitAttemptId taskId) {
-    synchronized (appenderMap) {
-      Map<Integer, PartitionAppenderMeta> partitionAppenderMap =
-        appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId());
-      if (partitionAppenderMap == null) {
-        return;
-      }
-
-      for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) {
-        eachAppender.appender.taskFinished(taskId);
-      }
-    }
-  }
-
-  public static class HashShuffleIntermediate {
-    private int partId;
-
-    private long volume;
-
-    //[<page start offset,<task start, task end>>]
-    private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes;
-
-    //[<page start offset, length>]
-    private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>();
-
-    public HashShuffleIntermediate(int partId, long volume,
-                                   List<Pair<Long, Integer>> pages,
-                                   Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) {
-      this.partId = partId;
-      this.volume = volume;
-      this.failureTskTupleIndexes = failureTskTupleIndexes;
-      this.pages = pages;
-    }
-
-    public int getPartId() {
-      return partId;
-    }
-
-    public long getVolume() {
-      return volume;
-    }
-
-    public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() {
-      return failureTskTupleIndexes;
-    }
-
-    public List<Pair<Long, Integer>> getPages() {
-      return pages;
-    }
-  }
-
-  static class PartitionAppenderMeta {
-    int partId;
-    HashShuffleAppender appender;
-    Path dataFile;
-
-    public int getPartId() {
-      return partId;
-    }
-
-    public HashShuffleAppender getAppender() {
-      return appender;
-    }
-
-    public Path getDataFile() {
-      return dataFile;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
deleted file mode 100644
index bfbe478..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.IntervalDatum;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.datum.ProtobufDatum;
-import org.apache.tajo.exception.UnsupportedException;
-
-import java.util.Arrays;
-
-public class LazyTuple implements Tuple, Cloneable {
-  private long offset;
-  private Datum[] values;
-  private byte[][] textBytes;
-  private Schema schema;
-  private byte[] nullBytes;
-  private SerializerDeserializer serializeDeserialize;
-
-  public LazyTuple(Schema schema, byte[][] textBytes, long offset) {
-    this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer());
-  }
-
-  public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) {
-    this.schema = schema;
-    this.textBytes = textBytes;
-    this.values = new Datum[schema.size()];
-    this.offset = offset;
-    this.nullBytes = nullBytes;
-    this.serializeDeserialize = serde;
-  }
-
-  public LazyTuple(LazyTuple tuple) {
-    this.values = tuple.getValues();
-    this.offset = tuple.offset;
-    this.schema = tuple.schema;
-    this.textBytes = new byte[size()][];
-    this.nullBytes = tuple.nullBytes;
-    this.serializeDeserialize = tuple.serializeDeserialize;
-  }
-
-  @Override
-  public int size() {
-    return values.length;
-  }
-
-  @Override
-  public boolean contains(int fieldid) {
-    return textBytes[fieldid] != null || values[fieldid] != null;
-  }
-
-  @Override
-  public boolean isNull(int fieldid) {
-    return get(fieldid).isNull();
-  }
-
-  @Override
-  public boolean isNotNull(int fieldid) {
-    return !isNull(fieldid);
-  }
-
-  @Override
-  public void clear() {
-    for (int i = 0; i < values.length; i++) {
-      values[i] = null;
-      textBytes[i] = null;
-    }
-  }
-
-  //////////////////////////////////////////////////////
-  // Setter
-  //////////////////////////////////////////////////////
-  @Override
-  public void put(int fieldId, Datum value) {
-    values[fieldId] = value;
-    textBytes[fieldId] = null;
-  }
-
-  @Override
-  public void put(int fieldId, Datum[] values) {
-    for (int i = fieldId, j = 0; j < values.length; i++, j++) {
-      this.values[i] = values[j];
-    }
-    this.textBytes = new byte[values.length][];
-  }
-
-  @Override
-  public void put(int fieldId, Tuple tuple) {
-    for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) {
-      values[i] = tuple.get(j);
-      textBytes[i] = null;
-    }
-  }
-
-  @Override
-  public void put(Datum[] values) {
-    System.arraycopy(values, 0, this.values, 0, size());
-    this.textBytes = new byte[values.length][];
-  }
-
-  //////////////////////////////////////////////////////
-  // Getter
-  //////////////////////////////////////////////////////
-  @Override
-  public Datum get(int fieldId) {
-    if (values[fieldId] != null)
-      return values[fieldId];
-    else if (textBytes.length <= fieldId) {
-      values[fieldId] = NullDatum.get();  // split error. (col : 3, separator: ',', row text: "a,")
-    } else if (textBytes[fieldId] != null) {
-      try {
-        values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId),
-            textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes);
-      } catch (Exception e) {
-        values[fieldId] = NullDatum.get();
-      }
-      textBytes[fieldId] = null;
-    } else {
-      //non-projection
-    }
-    return values[fieldId];
-  }
-
-  @Override
-  public void setOffset(long offset) {
-    this.offset = offset;
-  }
-
-  @Override
-  public long getOffset() {
-    return this.offset;
-  }
-
-  @Override
-  public boolean getBool(int fieldId) {
-    return get(fieldId).asBool();
-  }
-
-  @Override
-  public byte getByte(int fieldId) {
-    return get(fieldId).asByte();
-  }
-
-  @Override
-  public char getChar(int fieldId) {
-    return get(fieldId).asChar();
-  }
-
-  @Override
-  public byte [] getBytes(int fieldId) {
-    return get(fieldId).asByteArray();
-  }
-
-  @Override
-  public short getInt2(int fieldId) {
-    return get(fieldId).asInt2();
-  }
-
-  @Override
-  public int getInt4(int fieldId) {
-    return get(fieldId).asInt4();
-  }
-
-  @Override
-  public long getInt8(int fieldId) {
-    return get(fieldId).asInt8();
-  }
-
-  @Override
-  public float getFloat4(int fieldId) {
-    return get(fieldId).asFloat4();
-  }
-
-  @Override
-  public double getFloat8(int fieldId) {
-    return get(fieldId).asFloat8();
-  }
-
-  @Override
-  public String getText(int fieldId) {
-    return get(fieldId).asChars();
-  }
-
-  @Override
-  public ProtobufDatum getProtobufDatum(int fieldId) {
-    throw new UnsupportedException();
-  }
-
-  @Override
-  public IntervalDatum getInterval(int fieldId) {
-    return (IntervalDatum) get(fieldId);
-  }
-
-  @Override
-  public char[] getUnicodeChars(int fieldId) {
-    return get(fieldId).asUnicodeChars();
-  }
-
-  public String toString() {
-    boolean first = true;
-    StringBuilder str = new StringBuilder();
-    str.append("(");
-    Datum d;
-    for (int i = 0; i < values.length; i++) {
-      d = get(i);
-      if (d != null) {
-        if (first) {
-          first = false;
-        } else {
-          str.append(", ");
-        }
-        str.append(i)
-            .append("=>")
-            .append(d);
-      }
-    }
-    str.append(")");
-    return str.toString();
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(values);
-  }
-
-  @Override
-  public Datum[] getValues() {
-    Datum[] datums = new Datum[values.length];
-    for (int i = 0; i < values.length; i++) {
-      datums[i] = get(i);
-    }
-    return datums;
-  }
-
-  @Override
-  public Tuple clone() throws CloneNotSupportedException {
-    LazyTuple lazyTuple = (LazyTuple) super.clone();
-
-    lazyTuple.values = getValues(); //shallow copy
-    lazyTuple.textBytes = new byte[size()][];
-    return lazyTuple;
-  }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (obj instanceof Tuple) {
-      Tuple other = (Tuple) obj;
-      return Arrays.equals(getValues(), other.getValues());
-    }
-    return false;
-  }
-}