You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/10/01 08:38:29 UTC

git commit: TAJO-1083: StoreTableExec should be block iterative.

Repository: tajo
Updated Branches:
  refs/heads/block_iteration c128eca31 -> 292f67438


TAJO-1083: StoreTableExec should be block iterative.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/292f6743
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/292f6743
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/292f6743

Branch: refs/heads/block_iteration
Commit: 292f6743845139dfc4ae5d7d672386b25202beed
Parents: c128eca
Author: Hyunsik Choi <hy...@apache.org>
Authored: Sun Sep 28 10:42:20 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Sun Sep 28 10:42:20 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/tajo/SessionVars.java  |   2 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../org/apache/tajo/engine/eval/EvalNode.java   |   7 +
 .../apache/tajo/engine/planner/Projector.java   |  23 +-
 .../engine/planner/physical/PhysicalExec.java   |   2 +-
 .../engine/planner/physical/SeqScanExec.java    |  24 +++
 .../engine/planner/physical/StoreTableExec.java |  28 +++
 .../tajo/engine/utils/TupleBuilderUtil.java     | 102 +++++++++
 .../org/apache/tajo/master/GlobalEngine.java    |  20 ++
 .../main/java/org/apache/tajo/worker/Task.java  |  14 +-
 .../physical/block/TestBlockIteratorExec.java   | 211 +++++++++++++++++++
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 .../tajo/tuple/offheap/OffHeapRowBlock.java     |   6 +-
 .../tuple/offheap/OffHeapRowBlockWriter.java    |   2 +-
 .../tajo/tuple/offheap/ResizableLimitSpec.java  |   2 +-
 .../apache/tajo/storage/TestNextFetches.java    |  23 +-
 16 files changed, 449 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index cc875b2..b63d4f4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -99,6 +99,8 @@ public enum SessionVars implements ConfigKey {
       "shuffle output size for partition table write (mb)", DEFAULT),
 
   // for physical Executors
+  EXEC_ENGINE(ConfVars.$EXECUTOR_ENGINE,
+      "executor engine types that queries will use. Types: volcano and block (default is volcano)", DEFAULT),
   EXTSORT_BUFFER_SIZE(ConfVars.$EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE, "sort buffer size for external sort (mb)", DEFAULT),
   HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_HASH_JOIN_SIZE_THRESHOLD, "limited size for hash join (mb)", DEFAULT),
   INNER_HASH_JOIN_SIZE_LIMIT(ConfVars.$EXECUTOR_INNER_HASH_JOIN_SIZE_THRESHOLD,

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index b5a9b50..b1229eb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -317,6 +317,7 @@ public class TajoConf extends Configuration {
     $DIST_QUERY_TABLE_PARTITION_VOLUME("tajo.dist-query.table-partition.task-volume-mb", 256),
 
     // for physical Executors
+    $EXECUTOR_ENGINE("tajo.executor.engine", "volcano"), // volcano, and block
     $EXECUTOR_EXTERNAL_SORT_BUFFER_SIZE("tajo.executor.external-sort.buffer-mb", 200L),
     $EXECUTOR_HASH_JOIN_SIZE_THRESHOLD("tajo.executor.join.common.in-memory-hash-threshold-bytes",
         (long)256 * 1048576),

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
index 754f888..b487001 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/eval/EvalNode.java
@@ -23,8 +23,10 @@ import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.json.CoreGsonHelper;
+import org.apache.tajo.engine.utils.TupleBuilderUtil;
 import org.apache.tajo.json.GsonObject;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.offheap.RowWriter;
 
 /**
  * An annotated expression which includes actual data domains.
@@ -59,6 +61,11 @@ public abstract class EvalNode implements Cloneable, GsonObject {
 	
 	public abstract <T extends Datum> T eval(Schema schema, Tuple tuple);
 
+  public void eval(Schema schema, Tuple tuple, RowWriter builder) {
+    Datum result = eval(schema, tuple);
+    TupleBuilderUtil.writeEvalResult(builder, result.type(), result);
+  }
+
   @Deprecated
   public abstract  void preOrder(EvalNodeVisitor visitor);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
index d8499d0..0d8bd5f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/Projector.java
@@ -21,7 +21,10 @@ package org.apache.tajo.engine.planner;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.engine.utils.TupleBuilderUtil;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.TupleBuilder;
+import org.apache.tajo.tuple.offheap.RowWriter;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 public class Projector {
@@ -33,7 +36,14 @@ public class Projector {
   private final int targetNum;
   private final EvalNode[] evals;
 
+  private final boolean useJITInSession;
+  private final boolean useJITInOperator;
+
   public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets) {
+    this(context, inSchema, outSchema, targets, true);
+  }
+
+  public Projector(TaskAttemptContext context, Schema inSchema, Schema outSchema, Target [] targets, boolean useJIT) {
     this.context = context;
     this.inSchema = inSchema;
     if (targets == null) {
@@ -45,7 +55,10 @@ public class Projector {
     this.targetNum = this.targets.length;
     evals = new EvalNode[targetNum];
 
-    if (context.getQueryContext().getBool(SessionVars.CODEGEN)) {
+    useJITInOperator = useJIT;
+    useJITInSession = context.getQueryContext().getBool(SessionVars.CODEGEN);
+
+    if (useJITInOperator && useJITInSession) {
       EvalNode eval;
       for (int i = 0; i < targetNum; i++) {
         eval = this.targets[i].getEvalTree();
@@ -63,4 +76,12 @@ public class Projector {
       out.put(i, evals[i].eval(inSchema, in));
     }
   }
+
+  public void eval(Tuple in, RowWriter builder) {
+    if (useJITInOperator && useJITInSession) {
+      TupleBuilderUtil.evaluateNative(inSchema, in, builder, evals);
+    } else {
+      TupleBuilderUtil.evaluate(inSchema, in, builder, evals);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
index 859c053..99cf610 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalExec.java
@@ -63,7 +63,7 @@ public abstract class PhysicalExec implements SchemaObject {
 
   public abstract Tuple next() throws IOException;
 
-  public boolean nextFetch(OffHeapRowBlock rowBlock) {
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
     throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 122d4f3..fd11c7b 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -41,6 +41,11 @@ import org.apache.tajo.engine.utils.TupleUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.TupleBuilder;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -67,6 +72,8 @@ public class SeqScanExec extends PhysicalExec {
 
   private boolean cacheRead = false;
 
+  private OffHeapRowBlock inRowBlock;
+
   public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan,
                      CatalogProtos.FragmentProto [] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
@@ -94,6 +101,8 @@ public class SeqScanExec extends PhysicalExec {
         && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) {
       rewriteColumnPartitionedTableSchema();
     }
+
+    inRowBlock = new OffHeapRowBlock(inSchema, 64 * StorageUnit.KB);
   }
 
   /**
@@ -289,6 +298,21 @@ public class SeqScanExec extends PhysicalExec {
     }
   }
 
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+    boolean noMoreTuple = scanner.nextFetch(inRowBlock);
+    if (!noMoreTuple) {
+      return false;
+    }
+
+    ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+    RowBlockReader reader = inRowBlock.getReader();
+    while (reader.next(zcTuple)) {
+      projector.eval(zcTuple, rowBlock.getWriter());
+    }
+
+    return true;
+  }
+
   @Override
   public void rescan() throws IOException {
     scanner.reset();

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index 3199b56..f88af15 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -28,9 +28,13 @@ import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
+import org.apache.tajo.exception.UnimplementedException;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -121,6 +125,30 @@ public class StoreTableExec extends UnaryPhysicalExec {
     return null;
   }
 
+  ZeroCopyTuple zcTuple = new ZeroCopyTuple();
+  RowBlockReader reader;
+
+  public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+    if (child.nextFetch(rowBlock)) {
+      reader = rowBlock.getReader();
+      while (reader.next(zcTuple)) {
+        appender.addTuple(zcTuple);;
+
+        if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize()) {
+          appender.close();
+
+          writtenFileNum++;
+          StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
+          openNewFile(writtenFileNum);
+        }
+      }
+
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   @Override
   public void rescan() throws IOException {
     // nothing to do

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java
new file mode 100644
index 0000000..dc0f058
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleBuilderUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.utils;
+
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.engine.eval.EvalNode;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.tuple.TupleBuilder;
+import org.apache.tajo.tuple.offheap.RowWriter;
+
+public class TupleBuilderUtil {
+
+  public static void evaluate(Schema inSchema, Tuple input, RowWriter builder, EvalNode[] evals) {
+    builder.startRow();
+    for (int i = 0; i < evals.length; i++) {
+      Datum result = evals[i].eval(inSchema, input);
+      writeEvalResult(builder, result.type(), result);
+    }
+    builder.endRow();
+  }
+
+  public static void evaluateNative(Schema inSchema, Tuple input, RowWriter builder, EvalNode[] evals) {
+    builder.startRow();
+    for (int i = 0; i < evals.length; i++) {
+      evals[i].eval(inSchema, input, builder);
+    }
+    builder.endRow();
+  }
+
+  public static void writeEvalResult(RowWriter builder, TajoDataTypes.Type type, Datum datum) {
+    switch (type) {
+    case NULL_TYPE:
+      builder.skipField();
+      break;
+    case BOOLEAN:
+      builder.putBool(datum.asBool());
+      break;
+    case INT1:
+    case INT2:
+      builder.putInt2(datum.asInt2());
+      break;
+    case INT4:
+      builder.putInt4(datum.asInt4());
+      break;
+    case INT8:
+      builder.putInt8(datum.asInt8());
+      break;
+    case FLOAT4:
+      builder.putFloat4(datum.asFloat4());
+      break;
+    case FLOAT8:
+      builder.putFloat8(datum.asFloat8());
+      break;
+    case TIMESTAMP:
+      builder.putTimestamp(datum.asInt8());
+      break;
+    case TIME:
+      builder.putTime(datum.asInt8());
+      break;
+    case DATE:
+      builder.putDate(datum.asInt4());
+      break;
+    case INTERVAL:
+      builder.putInterval((org.apache.tajo.datum.IntervalDatum) datum);
+      break;
+    case CHAR:
+    case TEXT:
+      builder.putText(datum.asTextBytes());
+      break;
+    case BLOB:
+      builder.putBlob(datum.asByteArray());
+      break;
+    case INET4:
+      builder.putInet4(datum.asInt4());
+      break;
+    case PROTOBUF:
+      builder.putProtoDatum((org.apache.tajo.datum.ProtobufDatum) datum);
+      break;
+    default:
+      throw new UnsupportedException("Unknown Type: " + type.name());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 504a792..23c4949 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -115,6 +115,26 @@ public class GlobalEngine extends AbstractService {
     super.stop();
   }
 
+  public SQLAnalyzer getSQLAnalyzer() {
+    return analyzer;
+  }
+
+  public PreLogicalPlanVerifier getPreLogicalPlanVerifier() {
+    return preVerifier;
+  }
+
+  public LogicalPlanner getLogicalPlanner() {
+    return planner;
+  }
+
+  public LogicalOptimizer getLogicalOptimizer() {
+    return optimizer;
+  }
+
+  public LogicalPlanVerifier getLogicalPlanVerifier() {
+    return annotatedPlanVerifier;
+  }
+
   private QueryContext createQueryContext(Session session) {
     QueryContext newQueryContext =  new QueryContext(context.getConf(), session);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index 5127e90..67210ea 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.TajoProtos.TaskAttemptState;
@@ -57,6 +58,8 @@ import org.apache.tajo.storage.RawFile;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.NetUtils;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.QueryStringDecoder;
@@ -446,7 +449,16 @@ public class Task {
           createPlan(context, plan);
       this.executor.init();
 
-      while(!killed && !aborted && executor.next() != null) {
+      String engineType = context.getQueryContext().get(SessionVars.EXEC_ENGINE);
+      LOG.info(engineType.toUpperCase() + " Executor Engine is chosen.");
+      if (engineType.equalsIgnoreCase("volcano")) {
+        while (!killed && !aborted && executor.next() != null) {
+        }
+      } else if (engineType.equalsIgnoreCase("block")) {
+        OffHeapRowBlock rowBlock = new OffHeapRowBlock(executor.getSchema(), 64 * StorageUnit.KB);
+        while (!killed && !aborted && executor.nextFetch(rowBlock)) {
+        }
+        rowBlock.release();
       }
     } catch (Throwable e) {
       error = e ;

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java
new file mode 100644
index 0000000..f6a1e05
--- /dev/null
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/block/TestBlockIteratorExec.java
@@ -0,0 +1,211 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical.block;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.*;
+import org.apache.tajo.algebra.Expr;
+import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.engine.parser.SQLAnalyzer;
+import org.apache.tajo.engine.planner.*;
+import org.apache.tajo.engine.planner.enforce.Enforcer;
+import org.apache.tajo.engine.planner.logical.*;
+import org.apache.tajo.engine.planner.physical.*;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.GlobalEngine;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.RowBlockReader;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.apache.tajo.worker.TaskAttemptContext;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.List;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+public class TestBlockIteratorExec extends QueryTestCaseBase {
+
+  private static SQLAnalyzer analyzer;
+  private static LogicalPlanner planner;
+  private static LogicalOptimizer optimizer;
+  private static PhysicalPlanner physicalPlanner;
+  private static AbstractStorageManager sm;
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    GlobalEngine engine = testingCluster.getMaster().getContext().getGlobalEngine();
+    analyzer = engine.getSQLAnalyzer();
+    planner = engine.getLogicalPlanner();
+    optimizer = engine.getLogicalOptimizer();
+
+    Path path = CommonTestingUtil.getTestDir("target/test-data/TestBlockExecutor");
+    sm = StorageManagerFactory.getStorageManager(conf, path);
+
+    physicalPlanner = new PhysicalPlannerImpl(conf, sm);
+  }
+
+  private static int i = 0;
+  static Path outputPath;
+
+  /**
+   * Build a physical execution plan, which is a tree consisting of a number of physical executors.
+   *
+   * @param sql a SQL statement
+   * @return Physical Execution Plan
+   * @throws PlanningException
+   * @throws IOException
+   */
+  public static PhysicalExec buildPhysicalPlan(String sql) throws PlanningException, IOException {
+    Expr expr = analyzer.parse(sql);
+
+    QueryContext context = LocalTajoTestingUtility.createDummyContext(conf);
+    LogicalPlan plan = planner.createPlan(context, expr);
+    optimizer.optimize(context, plan);
+
+    LogicalNode [] founds = PlannerUtil.findAllNodes(plan.getRootBlock().getRoot(), NodeType.SCAN);
+
+    List<FileFragment> mergedFragments = Lists.newArrayList();
+
+    for (LogicalNode node : founds) {
+      ScanNode scan = (ScanNode) node;
+      TableDesc table = scan.getTableDesc();
+      FileFragment[] frags = StorageManager.splitNG(conf, scan.getCanonicalName(), table.getMeta(), table.getPath(),
+          Integer.MAX_VALUE);
+
+      for (FileFragment f : frags) {
+        mergedFragments.add(f);
+      }
+    }
+
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testdir_" + (i++));
+
+    TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
+        LocalTajoTestingUtility.newQueryUnitAttemptId(), mergedFragments.toArray(new FileFragment[mergedFragments.size()]), workDir);
+
+    outputPath = new Path(workDir, "output");
+    ctx.setOutputPath(outputPath);
+    ctx.setEnforcer(new Enforcer());
+
+    return physicalPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
+  }
+
+  @Test
+  public void testSeqScan() throws IOException, PlanningException {
+    PhysicalExec exec = buildPhysicalPlan("select * from lineitem");
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB);
+    rowBlock.setMaxRow(1024);
+
+    exec.init();
+
+    int countForTuple = 0;
+    int countForRowBlock = 0;
+    while(exec.nextFetch(rowBlock)) {
+      ZeroCopyTuple tuple = new ZeroCopyTuple();
+      RowBlockReader reader =  rowBlock.getReader();
+      while (reader.next(tuple)) {
+        countForTuple++;
+      }
+      countForRowBlock += rowBlock.rows();
+    }
+    exec.close();
+    rowBlock.release();
+
+    assertEquals(5, countForTuple);
+    assertEquals(5, countForRowBlock);
+  }
+
+  @Test
+  public void testScanWithProjector() throws IOException, PlanningException {
+    PhysicalExec exec = buildPhysicalPlan("select l_orderkey, l_partkey from lineitem");
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB);
+    rowBlock.setMaxRow(1024);
+
+    exec.init();
+
+    int countForTuple = 0;
+    int countForRowBlock = 0;
+    while(exec.nextFetch(rowBlock)) {
+      ZeroCopyTuple tuple = new ZeroCopyTuple();
+      RowBlockReader reader =  rowBlock.getReader();
+      while (reader.next(tuple)) {
+        countForTuple++;
+      }
+      countForRowBlock += rowBlock.rows();
+    }
+    exec.close();
+    rowBlock.release();
+
+    assertEquals(5, countForTuple);
+    assertEquals(5, countForRowBlock);
+  }
+
+  @Test
+  public void testStoreTableExec() throws IOException, PlanningException {
+    PhysicalExec exec = buildPhysicalPlan("create table t1 using CSV as select * from lineitem");
+
+
+    OffHeapRowBlock rowBlock = new OffHeapRowBlock(exec.getSchema(), 64 * StorageUnit.KB);
+    rowBlock.setMaxRow(1024);
+
+    exec.init();
+
+    int countForTuple = 0;
+    int countForRowBlock = 0;
+    while(exec.nextFetch(rowBlock)) {
+      ZeroCopyTuple tuple = new ZeroCopyTuple();
+      RowBlockReader reader =  rowBlock.getReader();
+      while (reader.next(tuple)) {
+        countForTuple++;
+      }
+      countForRowBlock += rowBlock.rows();
+    }
+    exec.close();
+
+    assertEquals(5, countForTuple);
+    assertEquals(5, countForRowBlock);
+
+    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, exec.getSchema(), outputPath);
+    scanner.init();
+
+    int readTupleCount = 0;
+    while (scanner.nextFetch(rowBlock)) {
+      readTupleCount += rowBlock.rows();
+    }
+    scanner.close();
+
+    assertEquals(5, readTupleCount);
+
+    rowBlock.release();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index e6b12b1..f6edb3d 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -25,6 +25,7 @@ Available Session Variables:
 \set JOIN_PER_SHUFFLE_SIZE [int value] - shuffle output size for join (mb)
 \set GROUPBY_PER_SHUFFLE_SIZE [int value] - shuffle output size for sort (mb)
 \set TABLE_PARTITION_PER_SHUFFLE_SIZE [int value] - shuffle output size for partition table write (mb)
+\set EXEC_ENGINE [text value] - executor engine types that queries will use. Types: volcano and block (default is volcano)
 \set EXTSORT_BUFFER_SIZE [long value] - sort buffer size for external sort (mb)
 \set HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash join (mb)
 \set INNER_HASH_JOIN_SIZE_LIMIT [long value] - limited size for hash inner join (mb)

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
index 689efb7..ea86a5a 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlock.java
@@ -127,7 +127,11 @@ public class OffHeapRowBlock extends OffHeapMemory implements Deallocatable {
     return rowNum;
   }
 
-  public void setRows(int rowNum) {
+  public void setMaxRow(int rowNum) {
+    this.maxRowNum = rowNum;
+  }
+
+  void setRow(int rowNum) {
     this.rowNum = rowNum;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
index d177e0c..ba59b16 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/OffHeapRowBlockWriter.java
@@ -48,7 +48,7 @@ public class OffHeapRowBlockWriter extends OffHeapRowWriter {
   @Override
   public void endRow() {
     super.endRow();
-    rowBlock.setRows(rowBlock.rows() + 1);
+    rowBlock.setRow(rowBlock.rows() + 1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
index 14e67b2..8d782eb 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/ResizableLimitSpec.java
@@ -105,7 +105,7 @@ public class ResizableLimitSpec {
   }
 
   public long remain(long currentSize) {
-    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes.");
+    Preconditions.checkArgument(currentSize > 0, "Size must be greater than 0 bytes. But, its size is " + currentSize);
     return limitBytes > Integer.MAX_VALUE ? Integer.MAX_VALUE - currentSize : limitBytes - currentSize;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/292f6743/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
index d1b3afd..e81964b 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
@@ -42,17 +42,14 @@ import org.apache.tajo.storage.rcfile.RCFile;
 import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 import org.apache.tajo.tuple.RowBlockReader;
 import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
-import org.apache.tajo.tuple.offheap.UnSafeTuple;
 import org.apache.tajo.tuple.offheap.ZeroCopyTuple;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.UnsafeUtil;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import sun.misc.Unsafe;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -173,7 +170,7 @@ public class TestNextFetches {
       int tupleCnt = 0;
 
       OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-      rowBlock.setRows(1024);
+      rowBlock.setMaxRow(1024);
 
       while (scanner.nextFetch(rowBlock)) {
         tupleCnt += rowBlock.rows();
@@ -233,7 +230,7 @@ public class TestNextFetches {
       int tupleCnt = 0;
 
       OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-      rowBlock.setRows(1024);
+      rowBlock.setMaxRow(1024);
 
       while (scanner.nextFetch(rowBlock)) {
         tupleCnt += rowBlock.rows();
@@ -294,7 +291,7 @@ public class TestNextFetches {
     int tupleCnt = 0;
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple tuple = new ZeroCopyTuple();
     while (scanner.nextFetch(rowBlock)) {
@@ -376,7 +373,7 @@ public class TestNextFetches {
     scanner.init();
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple zcTuple = new ZeroCopyTuple();
     while (scanner.nextFetch(rowBlock)) {
@@ -465,7 +462,7 @@ public class TestNextFetches {
     scanner.init();
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple retrieved = new ZeroCopyTuple();
 
@@ -548,7 +545,7 @@ public class TestNextFetches {
     scanner.init();
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple retrieved = new ZeroCopyTuple();
     while (scanner.nextFetch(rowBlock)) {
@@ -624,7 +621,7 @@ public class TestNextFetches {
     scanner.init();
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple retrieved = new ZeroCopyTuple();
     while (scanner.nextFetch(rowBlock)) {
@@ -703,7 +700,7 @@ public class TestNextFetches {
     assertEquals(key.getClass().getCanonicalName(), LongWritable.class.getCanonicalName());
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple retrieved = new ZeroCopyTuple();
 
@@ -785,7 +782,7 @@ public class TestNextFetches {
     assertEquals(key.getClass().getCanonicalName(), BytesWritable.class.getCanonicalName());
 
     OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-    rowBlock.setRows(1024);
+    rowBlock.setMaxRow(1024);
 
     ZeroCopyTuple retrieved = new ZeroCopyTuple();
 
@@ -833,7 +830,7 @@ public class TestNextFetches {
       scanner.init();
 
       OffHeapRowBlock rowBlock = new OffHeapRowBlock(schema, 64 * StorageUnit.KB);
-      rowBlock.setRows(1024);
+      rowBlock.setMaxRow(1024);
 
       ZeroCopyTuple retrieved = new ZeroCopyTuple();