You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2013/08/12 06:38:10 UTC

git commit: TAJO-118: Refactor and Improve text file Scanner. (jinho)

Updated Branches:
  refs/heads/master 1c677cc5c -> cf6bd4b36


TAJO-118: Refactor and Improve text file Scanner. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/cf6bd4b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/cf6bd4b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/cf6bd4b3

Branch: refs/heads/master
Commit: cf6bd4b361b57ffebf1c99ed3e4179f4a155c159
Parents: 1c677cc
Author: jinossy <ji...@gmail.com>
Authored: Mon Aug 12 13:36:04 2013 +0900
Committer: jinossy <ji...@gmail.com>
Committed: Mon Aug 12 13:36:04 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../planner/physical/BSTIndexScanExec.java      |   1 +
 .../engine/planner/physical/PhysicalExec.java   |   2 +
 .../engine/planner/physical/SeqScanExec.java    |  30 ++++-
 .../apache/tajo/engine/query/ResultSetImpl.java |   1 +
 .../org/apache/tajo/master/ClientService.java   |   1 +
 .../org/apache/tajo/TajoTestingCluster.java     |   6 +
 .../planner/physical/TestPhysicalPlanner.java   |  31 +++++-
 .../tajo/worker/TestRangeRetrieverHandler.java  |   4 +-
 .../java/org/apache/tajo/storage/CSVFile.java   | 110 +++++++++----------
 .../org/apache/tajo/storage/FileScanner.java    |   2 +
 .../apache/tajo/storage/TestStorageManager.java |   2 +-
 .../org/apache/tajo/storage/TestStorages.java   |   2 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |   8 ++
 .../index/TestSingleCSVFileBSTIndex.java        |   7 +-
 15 files changed, 140 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e7bea99..d48c15c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,8 @@ Release 0.2.0 - unreleased
 
   IMPROVEMENTS
 
+    TAJO-118: Refactor and Improve text file Scanner. (jinho)
+
     TAJO-95: Eliminate the lazy copy approach from the classes wrapping
     protobuf-generated classes. (hyunsik)
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index e3b299d..a0d69e0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -62,6 +62,7 @@ public class BSTIndexScanExec extends PhysicalExec {
 
     this.fileScanner = (SeekableScanner)StorageManager.getScanner(context.getConf(),
         fragment.getMeta(), fragment, outSchema);
+    this.fileScanner.init();
     this.projector = new Projector(inSchema, outSchema, scanNode.getTargets());
     this.evalContexts = projector.renew();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 1781304..c67c309 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -29,12 +29,14 @@ public abstract class PhysicalExec implements SchemaObject {
   protected final TaskAttemptContext context;
   protected final Schema inSchema;
   protected final Schema outSchema;
+  protected final int outColumnNum;
 
   public PhysicalExec(final TaskAttemptContext context, final Schema inSchema,
                       final Schema outSchema) {
     this.context = context;
     this.inSchema = inSchema;
     this.outSchema = outSchema;
+    this.outColumnNum = outSchema.getColumnNum();
   }
 
   public final Schema getSchema() {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 9279c01..dc42a71 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -19,14 +19,20 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.TaskAttemptContext;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.eval.EvalContext;
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.eval.EvalTreeUtil;
 import org.apache.tajo.engine.planner.Projector;
+import org.apache.tajo.engine.planner.Target;
 import org.apache.tajo.engine.planner.logical.ScanNode;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.util.TUtil;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
 
 public class SeqScanExec extends PhysicalExec {
   private final ScanNode plan;
@@ -56,6 +62,28 @@ public class SeqScanExec extends PhysicalExec {
   }
 
   public void init() throws IOException {
+    Schema projected;
+    if (plan.hasTargets()) {
+      projected = new Schema();
+      Set<Column> columnSet = new HashSet<Column>();
+
+      if (plan.hasQual()) {
+        columnSet.addAll(EvalTreeUtil.findDistinctRefColumns(qual));
+      }
+
+      for (Target t : plan.getTargets()) {
+        columnSet.addAll(EvalTreeUtil.findDistinctRefColumns(t.getEvalTree()));
+      }
+
+      for (Column column : inSchema.getColumns()) {
+        if (columnSet.contains(column)) {
+          projected.addColumn(column);
+        }
+      }
+    } else {
+      projected = outSchema;
+    }
+
     this.projector = new Projector(inSchema, outSchema, plan.getTargets());
     this.evalContexts = projector.renew();
 
@@ -64,7 +92,7 @@ public class SeqScanExec extends PhysicalExec {
           TUtil.newList(fragments));
     } else {
       this.scanner = StorageManager.getScanner(context.getConf(), fragments[0].getMeta(),
-          fragments[0], plan.getOutSchema());
+          fragments[0], projected);
     }
 
     scanner.init();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
index 62a3ceb..db049a0 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/engine/query/ResultSetImpl.java
@@ -76,6 +76,7 @@ public class ResultSetImpl implements ResultSet {
     this.totalRow = meta.getStat() != null ? meta.getStat().getNumRows() : 0;
     Collection<Fragment> frags = getFragmentsNG(meta, path);
     scanner = new MergeScanner(conf, meta, frags);
+    scanner.init();
     init();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
index 827641a..a6a5f68 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/ClientService.java
@@ -135,6 +135,7 @@ public class ClientService extends AbstractService {
         if (e.getMessage() != null) {
           build.setErrorMessage(ExceptionUtils.getStackTrace(e));
         } else {
+          LOG.error("Internal Error", e);
           build.setErrorMessage("Internal Error");
         }
         return build.build();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
index 9f620e8..4911b48 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -333,6 +333,12 @@ public class TajoTestingCluster {
     conf.set("yarn.scheduler.capacity.root.queues", "default");
     conf.set("yarn.scheduler.capacity.root.default.capacity", "100");
 
+    // fixed thread OOM
+    conf.setInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 2);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT, 2);
+    conf.setInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT, 2);
+    conf.setInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 2);
+
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 384);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 3000);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index e329fad..3051204 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -173,7 +173,8 @@ public class TestPhysicalPlanner {
       "select 3 < 4 as ineq, 3.5 * 2 as score", // 12
       "select (1 > 0) and 3 > 1", // 13
       "select deptName, class, sum(score), max(score), min(score) from score", // 14
-      "select deptname, class, sum(score), max(score), min(score) from score group by deptname" // 15
+      "select deptname, class, sum(score), max(score), min(score) from score group by deptname", // 15
+      "select name from employee where empid >= 0", // 16
   };
 
   @Test
@@ -207,6 +208,34 @@ public class TestPhysicalPlanner {
   }
 
   @Test
+  public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
+    Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(),
+        employee.getPath(), Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
+    TaskAttemptContext ctx = new TaskAttemptContext(conf, TUtil
+        .newQueryUnitAttemptId(),
+        new Fragment[] { frags[0] }, workDir);
+    Expr expr = analyzer.parse(QUERIES[16]);
+    LogicalPlan plan = planner.createPlan(expr);
+    LogicalNode rootNode =plan.getRootBlock().getRoot();
+    LogicalOptimizer.optimize(plan);
+
+
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+
+    Tuple tuple;
+    int i = 0;
+    exec.init();
+    while ((tuple = exec.next()) != null) {
+      assertTrue(tuple.contains(0));
+      i++;
+    }
+    exec.close();
+    assertEquals(100, i);
+  }
+
+  @Test
   public final void testGroupByPlan() throws IOException, PlanningException {
     Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(),
         Integer.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
index 711d63d..109b3ab 100644
--- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
+++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java
@@ -163,7 +163,7 @@ public class TestRangeRetrieverHandler {
     reader.open();
     SeekableScanner scanner = (SeekableScanner)
         sm.getScanner(conf, employeeMeta, StorageUtil.concatPath(testDir, "output", "output"));
-
+    scanner.init();
     int cnt = 0;
     while(scanner.next() != null) {
       cnt++;
@@ -273,7 +273,7 @@ public class TestRangeRetrieverHandler {
     reader.open();
     SeekableScanner scanner = (SeekableScanner) StorageManager.getScanner(
         conf, meta, StorageUtil.concatPath(testDir, "output", "output"));
-
+    scanner.init();
     int cnt = 0;
     while(scanner.next() != null) {
       cnt++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 498d0df..0cba890 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.storage;
 
 import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -36,7 +37,7 @@ import org.apache.tajo.storage.json.StorageGsonHelper;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Arrays;
 
 public class CSVFile {
   public static final String DELIMITER = "csvfile.delimiter";
@@ -49,7 +50,6 @@ public class CSVFile {
     private final FileSystem fs;
     private FSDataOutputStream fos;
     private String delimiter;
-
     private TableStatistics stats = null;
 
     public CSVAppender(Configuration conf, final TableMeta meta,
@@ -58,7 +58,6 @@ public class CSVFile {
       this.fs = path.getFileSystem(conf);
       this.meta = meta;
       this.schema = meta.getSchema();
-
       this.delimiter = this.meta.getOption(DELIMITER, DELIMITER_DEFAULT);
     }
 
@@ -198,13 +197,12 @@ public class CSVFile {
     public CSVScanner(Configuration conf, final TableMeta meta,
         final Fragment fragment) throws IOException {
       super(conf, meta, fragment);
-      init(fragment);
     }
 
     private static final byte LF = '\n';
-    private final static long DEFAULT_BUFFER_SIZE = 65536;
+    private final static long DEFAULT_BUFFER_SIZE = 256 * 1024;
     private long bufSize;
-    private String delimiter;
+    private char delimiter;
     private FileSystem fs;
     private FSDataInputStream fis;
     private long startOffset, length, startPos;
@@ -215,17 +213,15 @@ public class CSVFile {
     private byte[] tail = null;
     private long pageStart = -1;
     private long prevTailLen = -1;
-    private HashMap<Long, Integer> curTupleOffsetMap = null;
+    private int[] targetColumnIndexes;
 
-    private void init(final Fragment fragment) throws IOException {
+    @Override
+    public void init() throws IOException {
 
       // Buffer size, Delimiter
       this.bufSize = DEFAULT_BUFFER_SIZE;
-      this.delimiter = fragment.getMeta().getOption(DELIMITER,
-          DELIMITER_DEFAULT);
-      if (this.delimiter.equals("|")) {
-        this.delimiter = "\\|";
-      }
+      String delim  = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
+      this.delimiter = delim.charAt(0);
 
       // Fragment information
       this.fs = fragment.getPath().getFileSystem(this.conf);
@@ -234,6 +230,16 @@ public class CSVFile {
       this.length = fragment.getLength();
       tuples = new String[0];
 
+      if (targets == null) {
+        targets = schema.toArray();
+      }
+
+      targetColumnIndexes = new int[targets.length];
+      for (int i = 0; i < targets.length; i++) {
+        targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+      }
+      super.init();
+
       if (startOffset != 0) {
         fis.seek(startOffset - 1);
         while (fis.readByte() != LF) {
@@ -275,11 +281,11 @@ public class CSVFile {
         buf = new byte[(int) bufSize];
         rbyte = fis.read(buf);
         tail = new byte[0];
-        tuples = new String(buf,0,rbyte).split("\n");
+        tuples = StringUtils.split(new String(buf, 0, rbyte), (char)LF);
       } else {
         buf = new byte[(int) bufSize];
         rbyte = fis.read(buf);
-        tuples = (new String(tail) + new String(buf,0,rbyte)).split("\n");
+        tuples = StringUtils.split(new String(tail) + new String(buf, 0, rbyte), (char)LF);
       }
 
       // Check tail
@@ -293,8 +299,7 @@ public class CSVFile {
           }
 
           // Replace tuple
-          tuples[tuples.length - 1] = new String(tuples[tuples.length - 1]
-              + new String(temp,0,cnt));
+          tuples[tuples.length - 1] = tuples[tuples.length - 1] + new String(temp, 0, cnt);
           validIdx = tuples.length;
         } else {
           tail = tuples[tuples.length - 1].getBytes();
@@ -309,26 +314,15 @@ public class CSVFile {
 
     private void makeTupleOffset() {
       long curTupleOffset = 0;
-      this.tupleOffsets = null;
       this.tupleOffsets = new long[this.validIdx];
-      
-      this.curTupleOffsetMap = null;
-      this.curTupleOffsetMap = new HashMap<Long, Integer>();
-      
       for (int i = 0; i < this.validIdx; i++) {
         this.tupleOffsets[i] = curTupleOffset + this.pageStart;
-        this.curTupleOffsetMap.put(tupleOffsets[i], i);
-        curTupleOffset += (this.tuples[i]  + "\n").getBytes().length;
+        curTupleOffset += this.tuples[i].getBytes().length + 1;//tuple byte +  1byte line feed
       }
       
     }
 
     @Override
-    public void init() throws IOException {
-      super.init();
-    }
-
-    @Override
     public Tuple next() throws IOException {
       try {
         if (currentIdx == validIdx) {
@@ -340,58 +334,55 @@ public class CSVFile {
           }
         }
         long offset = this.tupleOffsets[currentIdx];
-        String[] cells = tuples[currentIdx++].split(delimiter);
-        VTuple tuple = new VTuple(schema.getColumnNum());
+        String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
+        int targetLen = targets.length;
+        VTuple tuple = new VTuple(columnNum);
         Column field;
         tuple.setOffset(offset);
-        for (int i = 0; i < schema.getColumnNum(); i++) {
-          field = schema.getColumn(i);
-          if (cells.length <= i) {
-            tuple.put(i, DatumFactory.createNullDatum());
+        for (int i = 0; i < targetLen; i++) {
+          field = targets[i];
+          int tid = targetColumnIndexes[i];
+          if (cells.length <= tid) {
+            tuple.put(tid, DatumFactory.createNullDatum());
           } else {
-            String cell = cells[i].trim();
+            String cell = cells[tid].trim();
 
             if (cell.equals("")) {
-              tuple.put(i, DatumFactory.createNullDatum());
+              tuple.put(tid, DatumFactory.createNullDatum());
             } else {
               switch (field.getDataType().getType()) {
               case BOOLEAN:
-                tuple.put(i, DatumFactory.createBool(cell));
+                tuple.put(tid, DatumFactory.createBool(cell));
                 break;
               case BIT:
-                tuple.put(i,
-                    DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
+                tuple.put(tid, DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
                 break;
               case CHAR:
-                tuple.put(i, DatumFactory.createChar(cell.charAt(0)));
+                tuple.put(tid, DatumFactory.createChar(cell.charAt(0)));
                 break;
               case BLOB:
-                tuple.put(i,
-                    DatumFactory.createBlob(Base64.decodeBase64(cell)));
+                tuple.put(tid, DatumFactory.createBlob(Base64.decodeBase64(cell)));
                 break;
               case INT2:
-                tuple.put(i, DatumFactory.createInt2(cell));
+                tuple.put(tid, DatumFactory.createInt2(cell));
                 break;
               case INT4:
-                tuple.put(i, DatumFactory.createInt4(cell));
+                tuple.put(tid, DatumFactory.createInt4(cell));
                 break;
               case INT8:
-                tuple.put(i, DatumFactory.createInt8(cell));
+                tuple.put(tid, DatumFactory.createInt8(cell));
                 break;
               case FLOAT4:
-                tuple.put(i, DatumFactory.createFloat4(cell));
+                tuple.put(tid, DatumFactory.createFloat4(cell));
                 break;
               case FLOAT8:
                 tuple.put(i, DatumFactory.createFloat8(cell));
                 break;
-//              case STRING:
-//                tuple.put(i, DatumFactory.createText(cell));
-//                break;
               case TEXT:
-                tuple.put(i, DatumFactory.createText(cell));
+                tuple.put(tid, DatumFactory.createText(cell));
                 break;
               case INET4:
-                tuple.put(i, DatumFactory.createInet4(cell));
+                tuple.put(tid, DatumFactory.createInet4(cell));
                 break;
               case ARRAY:
                 Datum data = StorageGsonHelper.getInstance().fromJson(cell,
@@ -412,7 +403,7 @@ public class CSVFile {
 
     @Override
     public void reset() throws IOException {
-      init(fragment);
+      init();
     }
 
     @Override
@@ -422,11 +413,7 @@ public class CSVFile {
 
     @Override
     public boolean isProjectable() {
-      return false;
-    }
-
-    @Override
-    public void setTarget(Column[] targets) {
+      return true;
     }
 
     @Override
@@ -440,8 +427,9 @@ public class CSVFile {
 
     @Override
     public void seek(long offset) throws IOException {
-      if (this.curTupleOffsetMap.containsKey(offset)) {
-        this.currentIdx = this.curTupleOffsetMap.get(offset);
+      int tupleIndex = Arrays.binarySearch(this.tupleOffsets, offset);
+      if (tupleIndex > -1) {
+        this.currentIdx = tupleIndex;
       } else if (offset >= this.pageStart + this.bufSize 
           + this.prevTailLen - this.tail.length || offset <= this.pageStart) {
         fis.seek(offset);
@@ -455,7 +443,7 @@ public class CSVFile {
         throw new IOException("invalid offset " +
            " < pageStart : " +  this.pageStart + " , " + 
            "  pagelength : " + this.bufSize + " , " + 
-           "  tail lenght : " + this.tail.length + 
+           "  tail lenght : " + this.tail.length +
            "  input offset : " + offset + " >");
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index 4b8acc7..005879a 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -31,6 +31,7 @@ public abstract class FileScanner implements Scanner {
   protected final TableMeta meta;
   protected final Schema schema;
   protected final Fragment fragment;
+  protected final int columnNum;
 
   protected Column [] targets;
   
@@ -39,6 +40,7 @@ public abstract class FileScanner implements Scanner {
     this.meta = meta;
     this.schema = meta.getSchema();
     this.fragment = fragment;
+    this.columnNum = this.schema.getColumnNum();
   }
 
   public void init() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
index 7aed05d..4c4462f 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -83,7 +83,7 @@ public class TestStorageManager {
 		appender.close();
 
 		Scanner scanner = StorageManager.getScanner(conf, meta, path);
-
+    scanner.init();
 		int i=0;
 		while(scanner.next() != null) {
 			i++;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index b792d8b..b8c7021 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -170,7 +170,7 @@ public class TestStorages {
     int tupleCnt = 0;
     Tuple tuple;
     while ((tuple = scanner.next()) != null) {
-      if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI) {
+      if (storeType == StoreType.RCFILE || storeType == StoreType.TREVNI || storeType == StoreType.CSV) {
         assertNull(tuple.get(0));
       }
       assertEquals(DatumFactory.createInt8(tupleCnt + 2), tuple.getLong(1));

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index e66182a..63f2a2d 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -109,6 +109,7 @@ public class TestBSTIndex {
     creater.open();
 
     SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -130,6 +131,7 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp);
     reader.open();
     scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
@@ -204,6 +206,7 @@ public class TestBSTIndex {
         keySchema, comp);
     reader.open();
     SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
@@ -263,6 +266,7 @@ public class TestBSTIndex {
     creater.open();
 
     SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -330,6 +334,7 @@ public class TestBSTIndex {
     creater.open();
     
     SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -351,6 +356,7 @@ public class TestBSTIndex {
         keySchema, comp);
     reader.open();
     scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     Tuple result;
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       keyTuple = new VTuple(2);
@@ -414,6 +420,7 @@ public class TestBSTIndex {
     creater.open();
 
     SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -435,6 +442,7 @@ public class TestBSTIndex {
         keySchema, comp);
     reader.open();
     scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner.init();
     Tuple result;
     for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
       keyTuple = new VTuple(2);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/cf6bd4b3/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 054e8fa..2708403 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -93,8 +93,7 @@ public class TestSingleCSVFileBSTIndex {
 
     FileStatus status = fs.getFileStatus(tablePath);
     long fileLen = status.getLen();
-    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0,
-        fileLen, null);
+    Fragment tablet = new Fragment("table1_1", status.getPath(), meta, 0, fileLen, null);
 
     SortSpec[] sortKeys = new SortSpec[2];
     sortKeys[0] = new SortSpec(schema.getColumn("long"), true, false);
@@ -113,6 +112,7 @@ public class TestSingleCSVFileBSTIndex {
     creater.open();
 
     SeekableScanner fileScanner = new CSVScanner(conf, meta, tablet);
+    fileScanner.init();
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -136,6 +136,7 @@ public class TestSingleCSVFileBSTIndex {
         "FindValueInCSV.idx"), keySchema, comp);
     reader.open();
     fileScanner = new CSVScanner(conf, meta, tablet);
+    fileScanner.init();
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
@@ -200,6 +201,7 @@ public class TestSingleCSVFileBSTIndex {
     creater.open();
     
     SeekableScanner fileScanner  = new CSVScanner(conf, meta, tablet);
+    fileScanner.init();
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -220,6 +222,7 @@ public class TestSingleCSVFileBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp);
     reader.open();
     fileScanner  = new CSVScanner(conf, meta, tablet);
+    fileScanner.init();
     Tuple result;
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       keyTuple = new VTuple(2);