You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hj...@apache.org on 2014/09/22 02:21:47 UTC

git commit: TAJO-910: Simple query (non-forwarded query) should be supported against partition tables.

Repository: tajo
Updated Branches:
  refs/heads/master cda6c897a -> 7510f886e


TAJO-910: Simple query (non-forwarded query) should be supported against partition tables.

Closes #138


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/7510f886
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/7510f886
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/7510f886

Branch: refs/heads/master
Commit: 7510f886e7f310dede3f9460ca40ad4bdd46e4cf
Parents: cda6c89
Author: HyoungJun Kim <ba...@babokim-MacBook-Pro.local>
Authored: Mon Sep 22 09:21:04 2014 +0900
Committer: HyoungJun Kim <ba...@babokim-MacBook-Pro.local>
Committed: Mon Sep 22 09:21:04 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/client/TajoClient.java | 117 +++++++++++--
 .../org/apache/tajo/jdbc/FetchResultSet.java    |  91 ++++++++++
 .../org/apache/tajo/jdbc/TajoResultSetBase.java |   2 +-
 tajo-client/src/main/proto/ClientProtos.proto   |  29 +++-
 .../main/proto/TajoMasterClientProtocol.proto   |   4 +-
 .../main/java/org/apache/tajo/SessionVars.java  |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   5 +-
 .../java/org/apache/tajo/benchmark/TPCH.java    |  32 +++-
 .../apache/tajo/engine/planner/PlannerUtil.java | 138 +++++++++++++++-
 .../org/apache/tajo/master/GlobalEngine.java    |  31 ++--
 .../master/NonForwardQueryResultScanner.java    | 165 +++++++++++++++++++
 .../tajo/master/TajoMasterClientService.java    |  69 +++++++-
 .../org/apache/tajo/master/session/Session.java |  54 ++++++
 .../tajo/master/session/SessionManager.java     |  13 +-
 .../apache/tajo/worker/TaskAttemptContext.java  |   2 -
 .../org/apache/tajo/TajoTestingCluster.java     |  45 +++--
 .../tajo/engine/planner/TestPlannerUtil.java    |  62 +++++++
 .../tajo/engine/query/TestNullValues.java       |  32 +++-
 .../tajo/engine/query/TestSelectQuery.java      |  15 ++
 .../org/apache/tajo/jdbc/TestResultSet.java     |  10 +-
 .../apache/tajo/master/TestGlobalPlanner.java   |  68 +++++++-
 .../queries/TestSelectQuery/customer_ddl.sql    |   9 +
 .../TestSelectQuery/insert_into_customer.sql    |  11 ++
 ...testSimpleQueryWithLimitPartitionedTable.sql |   1 +
 ...tSimpleQueryWithLimitPartitionedTable.result |  12 ++
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 27 files changed, 936 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b5e4842..21bdd8c 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-910: Simple query (non-forwarded query) should be supported against 
+    partition tables. (Hyoungjun Kim)
+
     TAJO-1035: Add default TAJO_PULLSERVER_HEAPSIZE. (Hyoungjun Kim)
 
     TAJO-1049: Remove the parallel degree limit up to the maximum cluster 

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index ab3d874..3d61cce 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.QueryIdFactory;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.TajoProtos.QueryState;
 import org.apache.tajo.annotation.Nullable;
@@ -33,7 +34,10 @@ import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionDesc;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto;
 import org.apache.tajo.cli.InvalidClientSessionException;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
@@ -43,12 +47,14 @@ import org.apache.tajo.ipc.QueryMasterClientProtocol;
 import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
 import org.apache.tajo.ipc.TajoMasterClientProtocol;
 import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
+import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.SQLStates;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
 import org.apache.tajo.jdbc.TajoResultSet;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
@@ -63,7 +69,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
+import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 
 @ThreadSafe
 public class TajoClient implements Closeable {
@@ -204,6 +210,25 @@ public class TajoClient implements Closeable {
     }
   }
 
+  public void closeNonForwardQuery(final QueryId queryId) {
+    NettyClientBase tmClient = null;
+    try {
+      tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
+      TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
+      checkSessionAndGet(tmClient);
+
+      QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+
+      builder.setSessionId(getSessionId());
+      builder.setQueryId(queryId.getProto());
+      tajoMasterService.closeNonForwardQuery(null, builder.build());
+    } catch (Exception e) {
+      LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
+    } finally {
+      connPool.closeConnection(tmClient);
+    }
+  }
+
   private void checkSessionAndGet(NettyClientBase client) throws ServiceException {
     if (sessionId == null) {
       TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
@@ -518,23 +543,21 @@ public class TajoClient implements Closeable {
 
   public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException {
     if (response.hasTableDesc()) {
-      TajoConf conf = new TajoConf(client.getConf());
-      conf.setVar(ConfVars.USERNAME, response.getUserName());
-      if (response.hasMaxRowNum()) {
-        return new TajoResultSet(
-            client,
-            QueryIdFactory.NULL_QUERY_ID,
-            conf,
-            new TableDesc(response.getTableDesc()),
-            response.getMaxRowNum());
-      } else {
-        return new TajoResultSet(
-            client,
-            QueryIdFactory.NULL_QUERY_ID,
-            conf,
-            new TableDesc(response.getTableDesc()));
+      // non-forward query
+      // select * from table1 [limit 10]
+      int fetchRowNum = client.getConf().getIntVar(ConfVars.$RESULT_SET_FETCH_ROWNUM);
+      if (response.hasSessionVariables()) {
+        for (KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) {
+          if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) {
+            fetchRowNum = Integer.parseInt(eachKeyValue.getValue());
+          }
+        }
       }
+      TableDesc tableDesc = new TableDesc(response.getTableDesc());
+      return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum);
     } else {
+      // simple eval query
+      // select substr('abc', 1, 2)
       SerializedResultSet serializedResultSet = response.getResultSet();
       return new TajoMemoryResultSet(
           new Schema(serializedResultSet.getSchema()),
@@ -606,6 +629,44 @@ public class TajoClient implements Closeable {
     }
   }
 
+  public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException {
+    try {
+      ServerCallable<SerializedResultSet> callable =
+          new ServerCallable<SerializedResultSet>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
+        public SerializedResultSet call(NettyClientBase client) throws ServiceException {
+          checkSessionAndGet(client);
+          TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+          GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+          builder.setSessionId(sessionId);
+          builder.setQueryId(queryId.getProto());
+          builder.setFetchRowNum(fetchRowNum);
+          try {
+            GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+            if (response.getResultCode() == ResultCode.ERROR) {
+              throw new ServiceException(response.getErrorTrace());
+            }
+
+            return response.getResultSet();
+          } catch (ServiceException e) {
+            abort();
+            throw e;
+          } catch (Throwable t) {
+            throw new ServiceException(t.getMessage(), t);
+          }
+        }
+      };
+
+      SerializedResultSet serializedResultSet = callable.withRetries();
+
+      return new TajoMemoryResultSet(
+          new Schema(serializedResultSet.getSchema()),
+          serializedResultSet.getSerializedTuplesList(),
+          serializedResultSet.getSerializedTuplesCount());
+    } catch (Exception e) {
+      throw new ServiceException(e.getMessage(), e);
+    }
+  }
+
   public boolean updateQuery(final String sql) throws ServiceException {
     return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
@@ -747,6 +808,25 @@ public class TajoClient implements Closeable {
   public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
                                        final TableMeta meta)
       throws SQLException, ServiceException {
+    return createExternalTable(tableName, schema, path, meta, null);
+  }
+
+  /**
+   * Create an external table.
+   *
+   * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
+   *                  If the table name is not qualified, the current database in the session will be used.
+   * @param schema The schema
+   * @param path The external table location
+   * @param meta Table meta
+   * @param partitionMethodDesc Table partition description
+   * @return the created table description.
+   * @throws SQLException
+   * @throws ServiceException
+   */
+  public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+                                       final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
+      throws SQLException, ServiceException {
     return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(),
         TajoMasterClientProtocol.class, false, true) {
       public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
@@ -760,6 +840,9 @@ public class TajoClient implements Closeable {
         builder.setSchema(schema.getProto());
         builder.setMeta(meta.getProto());
         builder.setPath(path.toUri().toString());
+        if (partitionMethodDesc != null) {
+          builder.setPartition(partitionMethodDesc.getProto());
+        }
         TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
         if (res.getResultCode() == ResultCode.OK) {
           return CatalogUtil.newTableDesc(res.getTableDesc());
@@ -915,7 +998,7 @@ public class TajoClient implements Closeable {
 
       checkSessionAndGet(tmClient);
 
-      KillQueryRequest.Builder builder = KillQueryRequest.newBuilder();
+      QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
       builder.setSessionId(sessionId);
       builder.setQueryId(queryId.getProto());
       tajoMasterService.killQuery(null, builder.build());

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
new file mode 100644
index 0000000..7ebce91
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.IOException;
+import java.sql.SQLException;
+
+public class FetchResultSet extends TajoResultSetBase {
+  private TajoClient tajoClient;
+  private QueryId queryId;
+  private int fetchRowNum;
+  private TajoMemoryResultSet currentResultSet;
+  private boolean finished = false;
+
+  public FetchResultSet(TajoClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
+    this.tajoClient = tajoClient;
+    this.queryId = queryId;
+    this.fetchRowNum = fetchRowNum;
+    this.totalRow = Integer.MAX_VALUE;
+    this.schema = schema;
+  }
+
+  @Override
+  protected Tuple nextTuple() throws IOException {
+    if (finished) {
+      return null;
+    }
+
+    try {
+      Tuple tuple = null;
+      if (currentResultSet != null) {
+        currentResultSet.next();
+        tuple = currentResultSet.cur;
+      }
+      if (currentResultSet == null || tuple == null) {
+        if (currentResultSet != null) {
+          currentResultSet.close();
+          currentResultSet = null;
+        }
+        currentResultSet = tajoClient.fetchNextQueryResult(queryId, fetchRowNum);
+        if (currentResultSet == null) {
+          finished = true;
+          return null;
+        }
+
+        currentResultSet.next();
+        tuple = currentResultSet.cur;
+      }
+      if (tuple == null) {
+        if (currentResultSet != null) {
+          currentResultSet.close();
+          currentResultSet = null;
+        }
+        finished = true;
+      }
+      return tuple;
+    } catch (Throwable t) {
+      throw new IOException(t.getMessage(), t);
+    }
+  }
+
+  @Override
+  public void close() throws SQLException {
+    if (currentResultSet != null) {
+      currentResultSet.close();
+      currentResultSet = null;
+    }
+    tajoClient.closeNonForwardQuery(queryId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
index d189c78..78d8bde 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -764,7 +764,7 @@ public abstract class TajoResultSetBase implements ResultSet {
         return true;
       }
     } catch (IOException e) {
-      throw new SQLException(e.getMessage());
+      throw new SQLException(e.getMessage(), e);
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/proto/ClientProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto
index 0359685..23ae6dd 100644
--- a/tajo-client/src/main/proto/ClientProtos.proto
+++ b/tajo-client/src/main/proto/ClientProtos.proto
@@ -86,7 +86,7 @@ message GetQueryResultResponse {
   required string tajoUserName = 3;
 }
 
-message KillQueryRequest {
+message QueryIdRequest {
   optional SessionIdProto sessionId = 1;
   required QueryIdProto queryId = 2;
 }
@@ -115,6 +115,12 @@ message GetQueryStatusRequest {
   required QueryIdProto queryId = 2;
 }
 
+message SerializedResultSet {
+  optional SchemaProto schema = 1;
+  optional int32 bytesNum = 2;
+  repeated bytes serializedTuples = 3;
+}
+
 message SubmitQueryResponse {
   required ResultCode resultCode = 1;
   required QueryIdProto queryId = 2;
@@ -124,18 +130,14 @@ message SubmitQueryResponse {
   optional string queryMasterHost = 5;
   optional int32 queryMasterPort = 6;
 
-  message SerializedResultSet {
-    optional SchemaProto schema = 1;
-    optional int32 bytesNum = 2;
-    repeated bytes serializedTuples = 3;
-  }
-
   optional SerializedResultSet resultSet = 7;
   optional TableDescProto tableDesc = 8;
   optional int32 maxRowNum = 9;
 
   optional string errorMessage = 10;
   optional string errorTrace = 11;
+
+  optional KeyValueSetProto sessionVariables = 12;
 }
 
 message GetQueryStatusResponse {
@@ -152,6 +154,19 @@ message GetQueryStatusResponse {
   optional int32 queryMasterPort = 12;
 }
 
+message GetQueryResultDataRequest {
+  required SessionIdProto sessionId = 1;
+  required QueryIdProto queryId = 2;
+  required int32 fetchRowNum = 3;
+}
+
+message GetQueryResultDataResponse {
+  required ResultCode resultCode = 1;
+  required SerializedResultSet resultSet = 2;
+  optional string errorMessage = 3;
+  optional string errorTrace = 4;
+}
+
 message GetClusterInfoRequest {
   optional SessionIdProto sessionId = 1;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
index 9495fb1..1afc069 100644
--- a/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
+++ b/tajo-client/src/main/proto/TajoMasterClientProtocol.proto
@@ -42,13 +42,15 @@ service TajoMasterClientProtocolService {
   rpc submitQuery(QueryRequest) returns (SubmitQueryResponse);
   rpc updateQuery(QueryRequest) returns (UpdateQueryResponse);
   rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse);
+  rpc getQueryResultData(GetQueryResultDataRequest) returns (GetQueryResultDataResponse);
 
   // Query And Resource Management APIs
   rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse);
   rpc getRunningQueryList(GetQueryListRequest) returns (GetQueryListResponse);
   rpc getFinishedQueryList(GetQueryListRequest) returns (GetQueryListResponse);
-  rpc killQuery(KillQueryRequest) returns (BoolProto);
+  rpc killQuery(QueryIdRequest) returns (BoolProto);
   rpc getClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
+  rpc closeNonForwardQuery(QueryIdRequest) returns (BoolProto);
 
   // Database Management APIs
   rpc createDatabase(SessionedStringProto) returns (BoolProto);

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index c32fd43..cc875b2 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -115,6 +115,9 @@ public enum SessionVars implements ConfigKey {
   ARITHABORT(ConfVars.$BEHAVIOR_ARITHMETIC_ABORT,
       "If true, a running query will be terminated when an overflow or divide-by-zero occurs.", DEFAULT),
 
+  // ResultSet ----------------------------------------------------------------
+  FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT),
+
   //-------------------------------------------------------------------------------
   // Only for Unit Testing
   //-------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index a089b54..b5a9b50 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -356,7 +356,10 @@ public class TajoConf extends Configuration {
     $TEST_MIN_TASK_NUM("tajo.test.min-task-num", -1),
 
     // Behavior Control ---------------------------------------------------------
-    $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false);
+    $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false),
+
+    // ResultSet ---------------------------------------------------------
+    $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200)
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index abd9e7f..71d930f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -26,7 +26,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.storage.StorageConstants;
 
@@ -40,6 +43,7 @@ public class TPCH extends BenchmarkSet {
 
   public static final String LINEITEM = "lineitem";
   public static final String CUSTOMER = "customer";
+  public static final String CUSTOMER_PARTS = "customer_parts";
   public static final String NATION = "nation";
   public static final String PART = "part";
   public static final String REGION = "region";
@@ -54,6 +58,7 @@ public class TPCH extends BenchmarkSet {
   static {
     tableVolumes.put(LINEITEM, 759863287L);
     tableVolumes.put(CUSTOMER, 24346144L);
+    tableVolumes.put(CUSTOMER_PARTS, 707L);
     tableVolumes.put(NATION, 2224L);
     tableVolumes.put(PART, 24135125L);
     tableVolumes.put(REGION, 389L);
@@ -98,6 +103,16 @@ public class TPCH extends BenchmarkSet {
         .addColumn("c_comment", Type.TEXT); // 7
     schemas.put(CUSTOMER, customer);
 
+    Schema customerParts = new Schema()
+        .addColumn("c_custkey", Type.INT4) // 0
+        .addColumn("c_name", Type.TEXT) // 1
+        .addColumn("c_address", Type.TEXT) // 2
+        .addColumn("c_phone", Type.TEXT) // 3
+        .addColumn("c_acctbal", Type.FLOAT8) // 4
+        .addColumn("c_mktsegment", Type.TEXT) // 5
+        .addColumn("c_comment", Type.TEXT); // 6
+    schemas.put(CUSTOMER_PARTS, customerParts);
+
     Schema nation = new Schema()
         .addColumn("n_nationkey", Type.INT4) // 0
         .addColumn("n_name", Type.TEXT) // 1
@@ -177,6 +192,7 @@ public class TPCH extends BenchmarkSet {
   public void loadTables() throws ServiceException {
     loadTable(LINEITEM);
     loadTable(CUSTOMER);
+    loadTable(CUSTOMER_PARTS);
     loadTable(NATION);
     loadTable(PART);
     loadTable(REGION);
@@ -187,12 +203,24 @@ public class TPCH extends BenchmarkSet {
 
   }
 
-  private void loadTable(String tableName) throws ServiceException {
+  public void loadTable(String tableName) throws ServiceException {
     TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV);
     meta.putOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 
+    PartitionMethodDesc partitionMethodDesc = null;
+    if (tableName.equals(CUSTOMER_PARTS)) {
+      Schema expressionSchema = new Schema();
+      expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4);
+      partitionMethodDesc = new PartitionMethodDesc(
+          tajo.getCurrentDatabase(),
+          CUSTOMER_PARTS,
+          CatalogProtos.PartitionType.COLUMN,
+          "c_nationkey",
+          expressionSchema);
+    }
     try {
-      tajo.createExternalTable(tableName, getSchema(tableName), new Path(dataDir, tableName), meta);
+      tajo.createExternalTable(tableName, getSchema(tableName),
+          new Path(dataDir, tableName), meta, partitionMethodDesc);
     } catch (SQLException s) {
       throw new ServiceException(s);
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
index 3390758..c4cc254 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PlannerUtil.java
@@ -21,21 +21,31 @@ package org.apache.tajo.engine.planner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.algebra.*;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SortSpec;
+import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.engine.eval.*;
 import org.apache.tajo.engine.exception.InvalidQueryException;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.utils.SchemaUtil;
 import org.apache.tajo.storage.TupleComparator;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.TUtil;
 
+import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class PlannerUtil {
 
@@ -70,24 +80,43 @@ public class PlannerUtil {
     // one block, without where clause, no group-by, no-sort, no-join
     boolean isOneQueryBlock = plan.getQueryBlocks().size() == 1;
     boolean simpleOperator = rootNode.getChild().getType() == NodeType.LIMIT
-        || rootNode.getChild().getType() == NodeType.SCAN;
+        || rootNode.getChild().getType() == NodeType.SCAN || rootNode.getChild().getType() == NodeType.PARTITIONS_SCAN;
     boolean noOrderBy = !plan.getRootBlock().hasNode(NodeType.SORT);
     boolean noGroupBy = !plan.getRootBlock().hasNode(NodeType.GROUP_BY);
     boolean noWhere = !plan.getRootBlock().hasNode(NodeType.SELECTION);
     boolean noJoin = !plan.getRootBlock().hasNode(NodeType.JOIN);
-    boolean singleRelation = plan.getRootBlock().hasNode(NodeType.SCAN)
-        && PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
+    boolean singleRelation =
+        (plan.getRootBlock().hasNode(NodeType.SCAN) || plan.getRootBlock().hasNode(NodeType.PARTITIONS_SCAN)) &&
+        PlannerUtil.getRelationLineage(plan.getRootBlock().getRoot()).length == 1;
 
     boolean noComplexComputation = false;
     if (singleRelation) {
       ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
-      if (!scanNode.getTableDesc().hasPartition() && scanNode.hasTargets()
-          && scanNode.getTargets().length == scanNode.getInSchema().size()) {
+      if (scanNode == null) {
+        scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+      }
+      if (scanNode.hasTargets()) {
+        // If the number of columns in the select clause is s different from table schema,
+        // This query is not a simple query.
+        if (scanNode.getTableDesc().hasPartition()) {
+          // In the case of partitioned table, the actual number of columns is ScanNode.InSchema + partitioned columns
+          int numPartitionColumns = scanNode.getTableDesc().getPartitionMethod().getExpressionSchema().size();
+          if (scanNode.getTargets().length != scanNode.getInSchema().size() + numPartitionColumns) {
+            return false;
+          }
+        } else {
+          if (scanNode.getTargets().length != scanNode.getInSchema().size()) {
+            return false;
+          }
+        }
         noComplexComputation = true;
         for (int i = 0; i < scanNode.getTargets().length; i++) {
-          noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD;
+          noComplexComputation =
+              noComplexComputation && scanNode.getTargets()[i].getEvalTree().getType() == EvalType.FIELD;
           if (noComplexComputation) {
-            noComplexComputation = noComplexComputation && scanNode.getTargets()[i].getNamedColumn().equals(scanNode.getInSchema().getColumn(i));
+            noComplexComputation = noComplexComputation &&
+                scanNode.getTargets()[i].getNamedColumn().equals(
+                    scanNode.getTableDesc().getLogicalSchema().getColumn(i));
           }
           if (!noComplexComputation) {
             return noComplexComputation;
@@ -97,7 +126,8 @@ public class PlannerUtil {
     }
 
     return !checkIfDDLPlan(rootNode) &&
-        (simpleOperator && noComplexComputation  && isOneQueryBlock && noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
+        (simpleOperator && noComplexComputation && isOneQueryBlock &&
+            noOrderBy && noGroupBy && noWhere && noJoin && singleRelation);
   }
 
   /**
@@ -762,4 +792,96 @@ public class PlannerUtil {
 
     return explains.toString();
   }
+
+  /**
+   * Listing table data file which is not empty.
+   * If the table is a partitioned table, return file list which has same partition key.
+   * @param tajoConf
+   * @param tableDesc
+   * @param fileIndex
+   * @param numResultFiles
+   * @return
+   * @throws IOException
+   */
+  public static FragmentProto[] getNonZeroLengthDataFiles(TajoConf tajoConf,TableDesc tableDesc,
+                                                          int fileIndex, int numResultFiles) throws IOException {
+    FileSystem fs = tableDesc.getPath().getFileSystem(tajoConf);
+
+    List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>();
+    if (fs.exists(tableDesc.getPath())) {
+      getNonZeroLengthDataFiles(fs, tableDesc.getPath(), nonZeroLengthFiles, fileIndex, numResultFiles,
+          new AtomicInteger(0));
+    }
+
+    List<FileFragment> fragments = new ArrayList<FileFragment>();
+
+    //In the case of partitioned table, return same partition key data files.
+    int numPartitionColumns = 0;
+    if (tableDesc.hasPartition()) {
+      numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size();
+    }
+    String[] previousPartitionPathNames = null;
+    for (FileStatus eachFile: nonZeroLengthFiles) {
+      FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null);
+
+      if (numPartitionColumns > 0) {
+        // finding partition key;
+        Path filePath = fileFragment.getPath();
+        Path parentPath = filePath;
+        String[] parentPathNames = new String[numPartitionColumns];
+        for (int i = 0; i < numPartitionColumns; i++) {
+          parentPath = parentPath.getParent();
+          parentPathNames[numPartitionColumns - i - 1] = parentPath.getName();
+        }
+
+        // If current partitionKey == previousPartitionKey, add to result.
+        if (previousPartitionPathNames == null) {
+          fragments.add(fileFragment);
+        } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) {
+          fragments.add(fileFragment);
+        } else {
+          break;
+        }
+        previousPartitionPathNames = parentPathNames;
+      } else {
+        fragments.add(fileFragment);
+      }
+    }
+    return FragmentConvertor.toFragmentProtoArray(fragments.toArray(new FileFragment[]{}));
+  }
+
+  private static void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result,
+                                         int startFileIndex, int numResultFiles,
+                                         AtomicInteger currentFileIndex) throws IOException {
+    if (fs.isDirectory(path)) {
+      FileStatus[] files = fs.listStatus(path);
+      if (files != null && files.length > 0) {
+        for (FileStatus eachFile : files) {
+          if (result.size() >= numResultFiles) {
+            return;
+          }
+          if (eachFile.isDirectory()) {
+            getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles,
+                currentFileIndex);
+          } else if (eachFile.isFile() && eachFile.getLen() > 0) {
+            if (currentFileIndex.get() >= startFileIndex) {
+              result.add(eachFile);
+            }
+            currentFileIndex.incrementAndGet();
+          }
+        }
+      }
+    } else {
+      FileStatus fileStatus = fs.getFileStatus(path);
+      if (fileStatus != null && fileStatus.getLen() > 0) {
+        if (currentFileIndex.get() >= startFileIndex) {
+          result.add(fileStatus);
+        }
+        currentFileIndex.incrementAndGet();
+        if (result.size() >= numResultFiles) {
+          return;
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 2c62d42..504a792 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -27,10 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoConstants;
+import org.apache.tajo.*;
 import org.apache.tajo.algebra.AlterTablespaceSetType;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.algebra.JsonHelper;
@@ -39,6 +36,7 @@ import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
@@ -50,6 +48,7 @@ import org.apache.tajo.engine.parser.SQLAnalyzer;
 import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.planner.physical.EvalExprExec;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
 import org.apache.tajo.engine.planner.physical.StoreTableExec;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.ipc.ClientProtos;
@@ -71,7 +70,7 @@ import java.util.List;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
 import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto;
 import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
-import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse.SerializedResultSet;
+import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
 
 public class GlobalEngine extends AbstractService {
   /** Class Logger */
@@ -217,17 +216,27 @@ public class GlobalEngine extends AbstractService {
       // Simple query indicates a form of 'select * from tb_name [LIMIT X];'.
     } else if (PlannerUtil.checkIfSimpleQuery(plan)) {
       ScanNode scanNode = plan.getRootBlock().getNode(NodeType.SCAN);
+      if (scanNode == null) {
+        scanNode = plan.getRootBlock().getNode(NodeType.PARTITIONS_SCAN);
+      }
       TableDesc desc = scanNode.getTableDesc();
+      int maxRow = Integer.MAX_VALUE;
       if (plan.getRootBlock().hasNode(NodeType.LIMIT)) {
         LimitNode limitNode = plan.getRootBlock().getNode(NodeType.LIMIT);
-        responseBuilder.setMaxRowNum((int) limitNode.getFetchFirstNum());
-      } else {
-        if (desc.getStats().getNumBytes() > 0 && desc.getStats().getNumRows() == 0) {
-          responseBuilder.setMaxRowNum(Integer.MAX_VALUE);
-        }
+        maxRow = (int) limitNode.getFetchFirstNum();
       }
+      QueryId queryId = QueryIdFactory.newQueryId(context.getResourceManager().getSeedQueryId());
+
+      NonForwardQueryResultScanner queryResultScanner =
+          new NonForwardQueryResultScanner(context.getConf(), session.getSessionId(), queryId, scanNode, desc, maxRow);
+
+      queryResultScanner.init();
+      session.addNonForwardQueryResultScanner(queryResultScanner);
+
+      responseBuilder.setQueryId(queryId.getProto());
+      responseBuilder.setMaxRowNum(maxRow);
       responseBuilder.setTableDesc(desc.getProto());
-      responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto());
+      responseBuilder.setSessionVariables(session.getProto().getVariables());
       responseBuilder.setResultCode(ClientProtos.ResultCode.OK);
 
       // NonFromQuery indicates a form of 'select a, x+y;'

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
new file mode 100644
index 0000000..f8c51fd
--- /dev/null
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.master;
+
+import com.google.protobuf.ByteString;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryUnitAttemptId;
+import org.apache.tajo.QueryUnitId;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.engine.planner.PlannerUtil;
+import org.apache.tajo.engine.planner.logical.ScanNode;
+import org.apache.tajo.engine.planner.physical.SeqScanExec;
+import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.master.session.Session;
+import org.apache.tajo.storage.RowStoreUtil;
+import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
+import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class NonForwardQueryResultScanner {
+  private static final int MAX_FILE_NUM_PER_SCAN = 100;
+
+  private QueryId queryId;
+  private String sessionId;
+  private SeqScanExec scanExec;
+  private TableDesc tableDesc;
+  private RowStoreEncoder rowEncoder;
+  private int maxRow;
+  private int currentNumRows;
+  private TaskAttemptContext taskContext;
+  private TajoConf tajoConf;
+  private ScanNode scanNode;
+
+  private int currentFileIndex = 0;
+
+  public NonForwardQueryResultScanner(TajoConf tajoConf, String sessionId,
+                                      QueryId queryId,
+                                      ScanNode scanNode,
+                                      TableDesc tableDesc,
+                                      int maxRow) throws IOException {
+    this.tajoConf = tajoConf;
+    this.sessionId = sessionId;
+    this.queryId = queryId;
+    this.scanNode = scanNode;
+    this.tableDesc = tableDesc;
+    this.maxRow = maxRow;
+
+    this.rowEncoder =  RowStoreUtil.createEncoder(tableDesc.getLogicalSchema());
+  }
+
+  public void init() throws IOException {
+    initSeqScanExec();
+  }
+
+  private void initSeqScanExec() throws IOException {
+    FragmentProto[] fragments = PlannerUtil.getNonZeroLengthDataFiles(tajoConf, tableDesc,
+        currentFileIndex, MAX_FILE_NUM_PER_SCAN);
+    if (fragments != null && fragments.length > 0) {
+      this.taskContext = new TaskAttemptContext(
+          new QueryContext(tajoConf), null,
+          new QueryUnitAttemptId(new QueryUnitId(new ExecutionBlockId(queryId, 1), 0), 0),
+          fragments, null);
+
+      try {
+        // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
+        scanExec = new SeqScanExec(taskContext,
+            StorageManagerFactory.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments);
+      } catch (CloneNotSupportedException e) {
+        throw new IOException(e.getMessage(), e);
+      }
+      scanExec.init();
+      currentFileIndex += fragments.length;
+    }
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
+  public String getSessionId() {
+    return sessionId;
+  }
+
+  public void setScanExec(SeqScanExec scanExec) {
+    this.scanExec = scanExec;
+  }
+
+  public TableDesc getTableDesc() {
+    return tableDesc;
+  }
+
+  public void close() throws Exception {
+    if (scanExec != null) {
+      scanExec.close();
+      scanExec = null;
+    }
+  }
+
+  public List<ByteString> getNextRows(int fetchRowNum) throws IOException {
+    List<ByteString> rows = new ArrayList<ByteString>();
+    if (scanExec == null) {
+      return rows;
+    }
+    int rowCount = 0;
+
+    while (true) {
+      Tuple tuple = scanExec.next();
+      if (tuple == null) {
+        scanExec.close();
+        scanExec = null;
+
+        initSeqScanExec();
+        if (scanExec != null) {
+          tuple = scanExec.next();
+        }
+        if (tuple == null) {
+          if (scanExec != null ) {
+            scanExec.close();
+            scanExec = null;
+          }
+
+          break;
+        }
+      }
+      rows.add(ByteString.copyFrom((rowEncoder.toBytes(tuple))));
+      rowCount++;
+      currentNumRows++;
+      if (rowCount >= fetchRowNum) {
+        break;
+      }
+
+      if (currentNumRows >= maxRow) {
+        scanExec.close();
+        scanExec = null;
+        break;
+      }
+    }
+
+    return rows;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 738b643..65bde27 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.master;
 
+import com.google.protobuf.ByteString;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.lang.exception.ExceptionUtils;
@@ -56,6 +57,7 @@ import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringProto;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.StringUtils;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -452,6 +454,7 @@ public class TajoMasterClientService extends AbstractService {
           if (queryInProgress == null) {
             queryInProgress = context.getQueryJobManager().getFinishedQuery(queryId);
           }
+
           if (queryInProgress != null) {
             QueryInfo queryInfo = queryInProgress.getQueryInfo();
             builder.setResultCode(ResultCode.OK);
@@ -468,14 +471,70 @@ public class TajoMasterClientService extends AbstractService {
               builder.setFinishTime(System.currentTimeMillis());
             }
           } else {
-            builder.setResultCode(ResultCode.ERROR);
-            builder.setErrorMessage("No such query: " + queryId.toString());
+            Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+            if (session.getNonForwardQueryResultScanner(queryId) != null) {
+              builder.setResultCode(ResultCode.OK);
+              builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED);
+            } else {
+              builder.setResultCode(ResultCode.ERROR);
+              builder.setErrorMessage("No such query: " + queryId.toString());
+            }
           }
         }
         return builder.build();
 
       } catch (Throwable t) {
-        throw new  ServiceException(t);
+        throw new ServiceException(t);
+      }
+    }
+
+    @Override
+    public GetQueryResultDataResponse getQueryResultData(RpcController controller, GetQueryResultDataRequest request)
+        throws ServiceException {
+      GetQueryResultDataResponse.Builder builder = GetQueryResultDataResponse.newBuilder();
+
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+
+        QueryId queryId = new QueryId(request.getQueryId());
+        NonForwardQueryResultScanner queryResultScanner = session.getNonForwardQueryResultScanner(queryId);
+        if (queryResultScanner == null) {
+          throw new ServiceException("No NonForwardQueryResultScanner for " + queryId);
+        }
+
+        List<ByteString> rows = queryResultScanner.getNextRows(request.getFetchRowNum());
+        SerializedResultSet.Builder resultSetBuilder = SerializedResultSet.newBuilder();
+        resultSetBuilder.setSchema(queryResultScanner.getTableDesc().getLogicalSchema().getProto());
+        resultSetBuilder.addAllSerializedTuples(rows);
+
+        builder.setResultSet(resultSetBuilder.build());
+        builder.setResultCode(ResultCode.OK);
+
+        LOG.info("Send result to client for " +
+            request.getSessionId().getId() + "," + queryId + ", " + rows.size() + " rows");
+
+      } catch (Throwable t) {
+        LOG.error(t.getMessage(), t);
+        builder.setResultCode(ResultCode.ERROR);
+        String errorMessage = t.getMessage() == null ? t.getClass().getName() : t.getMessage();
+        builder.setErrorMessage(errorMessage);
+        builder.setErrorTrace(org.apache.hadoop.util.StringUtils.stringifyException(t));
+      }
+      return builder.build();
+    }
+
+    @Override
+    public BoolProto closeNonForwardQuery(RpcController controller, QueryIdRequest request) throws ServiceException {
+      try {
+        context.getSessionManager().touch(request.getSessionId().getId());
+        Session session = context.getSessionManager().getSession(request.getSessionId().getId());
+        QueryId queryId = new QueryId(request.getQueryId());
+
+        session.closeNonForwardQueryResultScanner(queryId);
+        return BOOL_TRUE;
+      } catch (Throwable t) {
+        throw new ServiceException(t);
       }
     }
 
@@ -483,7 +542,7 @@ public class TajoMasterClientService extends AbstractService {
      * It is invoked by TajoContainerProxy.
      */
     @Override
-    public BoolProto killQuery(RpcController controller, KillQueryRequest request) throws ServiceException {
+    public BoolProto killQuery(RpcController controller, QueryIdRequest request) throws ServiceException {
       try {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
@@ -492,7 +551,7 @@ public class TajoMasterClientService extends AbstractService {
             new QueryInfo(queryId)));
         return BOOL_TRUE;
       } catch (Throwable t) {
-        t.printStackTrace();
+        LOG.error(t.getMessage(), t);
         throw new ServiceException(t);
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
index 1f21e2a..5f44ecb 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -19,7 +19,11 @@
 package org.apache.tajo.master.session;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
+import org.apache.tajo.master.NonForwardQueryResultScanner;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.common.ProtoObject;
 
@@ -29,10 +33,13 @@ import java.util.Map;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
 
 public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable {
+  private static final Log LOG = LogFactory.getLog(Session.class);
+
   private final String sessionId;
   private final String userName;
   private String currentDatabase;
   private final Map<String, String> sessionVariables;
+  private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
 
   // transient status
   private volatile long lastAccessTime;
@@ -139,4 +146,51 @@ public class Session implements SessionConstants, ProtoObject<SessionProto>, Clo
     newSession.sessionVariables.putAll(getAllVariables());
     return newSession;
   }
+
+  public NonForwardQueryResultScanner getNonForwardQueryResultScanner(QueryId queryId) {
+    synchronized (nonForwardQueryMap) {
+      return nonForwardQueryMap.get(queryId);
+    }
+  }
+
+  public void addNonForwardQueryResultScanner(NonForwardQueryResultScanner resultScanner) {
+    synchronized (nonForwardQueryMap) {
+      nonForwardQueryMap.put(resultScanner.getQueryId(), resultScanner);
+    }
+  }
+
+  public void closeNonForwardQueryResultScanner(QueryId queryId) {
+    NonForwardQueryResultScanner resultScanner;
+    synchronized (nonForwardQueryMap) {
+      resultScanner = nonForwardQueryMap.remove(queryId);
+    }
+
+    if (resultScanner != null) {
+      try {
+        resultScanner.close();
+      } catch (Exception e) {
+        LOG.error("NonForwardQueryResultScanne close error: " + e.getMessage(), e);
+      }
+    }
+  }
+
+  public void close() {
+    try {
+      synchronized (nonForwardQueryMap) {
+        for (NonForwardQueryResultScanner eachQueryScanner: nonForwardQueryMap.values()) {
+          try {
+            eachQueryScanner.close();
+          } catch (Exception e) {
+            LOG.error("Error while closing NonForwardQueryResultScanner: " +
+                eachQueryScanner.getSessionId() + ", " + e.getMessage(), e);
+          }
+        }
+
+        nonForwardQueryMap.clear();
+      }
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+      throw new RuntimeException(t.getMessage(), t);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
index 24df9d8..d701d03 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/SessionManager.java
@@ -74,12 +74,15 @@ public class SessionManager extends CompositeService implements EventHandler<Ses
     return sessionId;
   }
 
-  public void removeSession(String sessionId) {
+  public Session removeSession(String sessionId) {
     if (sessions.containsKey(sessionId)) {
-      sessions.remove(sessionId);
       LOG.info("Session " + sessionId + " is removed.");
+      Session session = sessions.remove(sessionId);
+      session.close();
+      return session;
     } else {
       LOG.error("No such session id: " + sessionId);
+      return null;
     }
   }
 
@@ -132,8 +135,10 @@ public class SessionManager extends CompositeService implements EventHandler<Ses
     }
 
     if (event.getType() == SessionEventType.EXPIRE) {
-      Session session = sessions.remove(event.getSessionId());
-      LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+      Session session = removeSession(event.getSessionId());
+      if (session != null) {
+        LOG.info("[Expired] Session username=" + session.getUserName() + ",sessionid=" + event.getSessionId());
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
index 422ec2b..96208ac 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java
@@ -118,8 +118,6 @@ public class TaskAttemptContext {
     if (workerContext != null) {
       this.hashShuffleAppenderManager = workerContext.getHashShuffleAppenderManager();
     } else {
-      // For TestCase
-      LOG.warn("WorkerContext is null, so create HashShuffleAppenderManager created per a Task.");
       try {
         this.hashShuffleAppenderManager = new HashShuffleAppenderManager(queryContext.getConf());
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index c74a4ec..aec11f6 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -573,6 +573,39 @@ public class TajoTestingCluster {
     LOG.info("Minicluster is down");
   }
 
+  public static TajoClient newTajoClient() throws Exception {
+    TpchTestBase instance = TpchTestBase.getInstance();
+    TajoTestingCluster util = instance.getTestingCluster();
+    while(true) {
+      if(util.getMaster().isMasterRunning()) {
+        break;
+      }
+      Thread.sleep(1000);
+    }
+    TajoConf conf = util.getConfiguration();
+    return new TajoClient(conf);
+  }
+
+  public static ResultSet run(String[] names,
+                              Schema[] schemas,
+                              KeyValueSet tableOption,
+                              String[][] tables,
+                              String query,
+                              TajoClient client) throws Exception {
+    TajoTestingCluster util = TpchTestBase.getInstance().getTestingCluster();
+
+    FileSystem fs = util.getDefaultFileSystem();
+    Path rootDir = util.getMaster().
+        getStorageManager().getWarehouseDir();
+    fs.mkdirs(rootDir);
+    for (int i = 0; i < names.length; i++) {
+      createTable(names[i], schemas[i], tableOption, tables[i]);
+    }
+    Thread.sleep(1000);
+    ResultSet res = client.executeQueryAndGetResult(query);
+    return res;
+  }
+
   public static ResultSet run(String[] names,
                               Schema[] schemas,
                               KeyValueSet tableOption,
@@ -588,17 +621,9 @@ public class TajoTestingCluster {
     }
     TajoConf conf = util.getConfiguration();
     TajoClient client = new TajoClient(conf);
+
     try {
-      FileSystem fs = util.getDefaultFileSystem();
-      Path rootDir = util.getMaster().
-          getStorageManager().getWarehouseDir();
-      fs.mkdirs(rootDir);
-      for (int i = 0; i < names.length; i++) {
-        createTable(names[i], schemas[i], tableOption, tables[i]);
-      }
-      Thread.sleep(1000);
-      ResultSet res = client.executeQueryAndGetResult(query);
-      return res;
+      return run(names, schemas, tableOption, tables, query, client);
     } finally {
       client.close();
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
index b370be7..756dadc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestPlannerUtil.java
@@ -18,11 +18,14 @@
 
 package org.apache.tajo.engine.planner;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.*;
 import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto;
 import org.apache.tajo.catalog.proto.CatalogProtos.FunctionType;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -34,13 +37,18 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.FragmentConvertor;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
@@ -318,4 +326,58 @@ public class TestPlannerUtil {
     assertTrue(innerComparator.compare(t1, t2) < 0);
     assertTrue(innerComparator.compare(t2, t1) > 0);
   }
+
+  @Test
+  public void testGetNonZeroLengthDataFiles() throws Exception {
+    String queryFiles = ClassLoader.getSystemResource("queries").toString() + "/TestSelectQuery";
+    Path path = new Path(queryFiles);
+
+    TableDesc tableDesc = new TableDesc();
+    tableDesc.setName("Test");
+    tableDesc.setPath(path);
+
+    FileSystem fs = path.getFileSystem(util.getConfiguration());
+
+    List<Path> expectedFiles = new ArrayList<Path>();
+    RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true);
+    while (files.hasNext()) {
+      LocatedFileStatus file = files.next();
+      if (file.isFile() && file.getLen() > 0) {
+        expectedFiles.add(file.getPath());
+      }
+    }
+    int fileNum = expectedFiles.size() / 5;
+
+    int numResultFiles = 0;
+    for (int i = 0; i <= 5; i++) {
+      int start = i * fileNum;
+
+      FragmentProto[] fragments =
+          PlannerUtil.getNonZeroLengthDataFiles(util.getConfiguration(), tableDesc, start, fileNum);
+      assertNotNull(fragments);
+
+      numResultFiles += fragments.length;
+      int expectedSize = fileNum;
+      if (i == 5) {
+        //last
+        expectedSize = expectedFiles.size() - (fileNum * 5);
+      }
+
+      comparePath(expectedFiles, fragments, start, expectedSize);
+    }
+
+    assertEquals(expectedFiles.size(), numResultFiles);
+  }
+
+  private void comparePath(List<Path> expectedFiles, FragmentProto[] fragments,
+                           int startIndex, int expectedSize) throws Exception {
+    assertEquals(expectedSize, fragments.length);
+
+    int index = 0;
+
+    for (int i = startIndex; i < startIndex + expectedSize; i++, index++) {
+      FileFragment fragment = FragmentConvertor.convert(util.getConfiguration(), StoreType.CSV, fragments[index]);
+      assertEquals(expectedFiles.get(i), fragment.getPath());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
index de75ca7..a492d6e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestNullValues.java
@@ -21,6 +21,7 @@ package org.apache.tajo.engine.query;
 import org.apache.tajo.IntegrationTest;
 import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
@@ -165,9 +166,11 @@ public class TestNullValues {
   @Test
   public final void testResultSetNullSimpleQuery() throws Exception {
     String tableName = "nulltable5";
-    ResultSet res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName);
+    ResultSet res = null;
 
+    TajoClient client = TajoTestingCluster.newTajoClient();
     try {
+      res = runNullTableQuery(tableName, "select col1, col2, col3, col4 from " + tableName, client);
       int numRows = 0;
 
       String expected =
@@ -193,7 +196,11 @@ public class TestNullValues {
       assertEquals(4, numRows);
       assertEquals(expected, result);
     } finally {
-      res.close();
+      if (res != null) {
+        res.close();
+      }
+
+      client.close();
     }
   }
 
@@ -207,9 +214,11 @@ public class TestNullValues {
         "col4 " +
         "from " + tableName;
 
-    ResultSet res = runNullTableQuery(tableName, query);
+    TajoClient client = TajoTestingCluster.newTajoClient();
+    ResultSet res = null;
 
     try {
+      res = runNullTableQuery(tableName, query, client);
       int numRows = 0;
       String expected =
           "null|99999|a|a|1.0|1.0|true\n" +
@@ -234,11 +243,15 @@ public class TestNullValues {
       assertEquals(4, numRows);
       assertEquals(expected, result);
     } finally {
-      res.close();
+      if (res != null) {
+        res.close();
+      }
+
+      client.close();
     }
   }
 
-  private ResultSet runNullTableQuery(String tableName, String query) throws Exception {
+  private ResultSet runNullTableQuery(String tableName, String query, TajoClient client) throws Exception {
     String [] table = new String[] {tableName};
     Schema schema = new Schema();
     schema.addColumn("col1", Type.INT4);
@@ -256,10 +269,11 @@ public class TestNullValues {
     tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
     tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N");
 
-    ResultSet res = TajoTestingCluster
-        .run(table, schemas, tableOptions, new String[][]{data}, query);
-
-    return res;
+    if (client == null) {
+      return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query);
+    } else {
+      return TajoTestingCluster.run(table, schemas, tableOptions, new String[][]{data}, query, client);
+    }
   }
 
   private void assertResultSetNull(ResultSet res, int numRows, boolean useName, int[] nullIndex) throws SQLException {

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
index 17ce37b..41e2f3e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java
@@ -81,6 +81,21 @@ public class TestSelectQuery extends QueryTestCaseBase {
   }
 
   @Test
+  public final void testSimpleQueryWithLimitPartitionedTable() throws Exception {
+    // select * from customer_parts limit 10;
+    executeDDL("customer_ddl.sql", null);
+    for (int i = 0; i < 5; i++) {
+      executeFile("insert_into_customer.sql").close();
+    }
+
+    ResultSet res = executeQuery();
+    assertResultSet(res);
+    cleanupQuery(res);
+
+    executeString("DROP TABLE customer_parts PURGE").close();
+  }
+
+  @Test
   public final void testExplainSelect() throws Exception {
     // explain select l_orderkey, l_partkey from lineitem;
     ResultSet res = executeQuery();

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
index 38908db..2c06b88 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestResultSet.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
@@ -139,6 +140,7 @@ public class TestResultSet {
     TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
 
     ResultSet res = null;
+    TajoClient client = TajoTestingCluster.newTajoClient();
     try {
       String tableName = "datetimetable";
       String query = "select col1, col2, col3 from " + tableName;
@@ -156,7 +158,7 @@ public class TestResultSet {
       tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 
       res = TajoTestingCluster
-          .run(table, schemas, tableOptions, new String[][]{data}, query);
+          .run(table, schemas, tableOptions, new String[][]{data}, query, client);
 
       assertTrue(res.next());
 
@@ -212,7 +214,11 @@ public class TestResultSet {
     } finally {
       TajoConf.setCurrentTimeZone(tajoCurrentTimeZone);
       TimeZone.setDefault(systemCurrentTimeZone);
-      res.close();
+      if (res != null) {
+        res.close();
+      }
+
+      client.close();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
index b6ac551..a92b751 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoTestingCluster;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.benchmark.TPCH;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes;
@@ -32,10 +33,7 @@ import org.apache.tajo.engine.eval.BinaryEval;
 import org.apache.tajo.engine.eval.EvalType;
 import org.apache.tajo.engine.eval.FieldEval;
 import org.apache.tajo.engine.parser.SQLAnalyzer;
-import org.apache.tajo.engine.planner.LogicalOptimizer;
-import org.apache.tajo.engine.planner.LogicalPlan;
-import org.apache.tajo.engine.planner.LogicalPlanner;
-import org.apache.tajo.engine.planner.PlanningException;
+import org.apache.tajo.engine.planner.*;
 import org.apache.tajo.engine.planner.global.DataChannel;
 import org.apache.tajo.engine.planner.global.ExecutionBlock;
 import org.apache.tajo.engine.planner.global.GlobalPlanner;
@@ -55,6 +53,8 @@ import java.util.Map;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TestGlobalPlanner {
   private static Log LOG = LogFactory.getLog(TestGlobalPlanner.class);
@@ -80,10 +80,10 @@ public class TestGlobalPlanner {
 
     // TPC-H Schema for Complex Queries
     String [] tables = {
-        "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer"
+        "part", "supplier", "partsupp", "nation", "region", "lineitem", "orders", "customer", "customer_parts"
     };
     int [] volumes = {
-        100, 200, 50, 5, 5, 800, 300, 100
+        100, 200, 50, 5, 5, 800, 300, 100, 707
     };
     tpch = new TPCH();
     tpch.loadSchemas();
@@ -96,6 +96,19 @@ public class TestGlobalPlanner {
           CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, tables[i]), tpch.getSchema(tables[i]), m,
           CommonTestingUtil.getTestDir());
       d.setStats(stats);
+
+      if (tables[i].equals(TPCH.CUSTOMER_PARTS)) {
+        Schema expressionSchema = new Schema();
+        expressionSchema.addColumn("c_nationkey", TajoDataTypes.Type.INT4);
+        PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc(
+            DEFAULT_DATABASE_NAME,
+            tables[i],
+            CatalogProtos.PartitionType.COLUMN,
+            "c_nationkey",
+            expressionSchema);
+
+        d.setPartitionMethod(partitionMethodDesc);
+      }
       catalog.createTable(d);
     }
 
@@ -174,10 +187,10 @@ public class TestGlobalPlanner {
     visitChildExecutionBLock(plan, root, evalMap);
 
     // Find required shuffleKey.
-    Assert.assertTrue(evalMap.get(eval1).booleanValue());
+    assertTrue(evalMap.get(eval1).booleanValue());
 
     // Find that ShuffleKeys only includes equi-join conditions
-    Assert.assertFalse(evalMap.get(eval2).booleanValue());
+    assertFalse(evalMap.get(eval2).booleanValue());
   }
 
   private void visitChildExecutionBLock(MasterPlan plan, ExecutionBlock parentBlock,
@@ -290,4 +303,43 @@ public class TestGlobalPlanner {
   public void testTPCH_Q5() throws Exception {
     buildPlan(FileUtil.readTextFile(new File("benchmark/tpch/q5.sql")));
   }
+
+  @Test
+  public void testCheckIfSimpleQuery() throws Exception {
+    MasterPlan plan = buildPlan("select * from customer");
+    assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    //partition table
+    plan = buildPlan("select * from customer_parts");
+    assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    plan = buildPlan("select * from customer where c_nationkey = 1");
+    assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    plan = buildPlan("select * from customer_parts where c_nationkey = 1");
+    assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    // same column order
+    plan = buildPlan("select c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" +
+        " from customer");
+    assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    plan = buildPlan("select c_custkey, c_name, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " +
+        " from customer_parts");
+    assertTrue(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    // different column order
+    plan = buildPlan("select c_name, c_custkey, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment" +
+        " from customer");
+    assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    plan = buildPlan("select c_name, c_custkey, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " +
+        " from customer_parts");
+    assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+
+    plan = buildPlan("insert into customer_parts " +
+        " select c_name, c_custkey, c_address, c_phone, c_acctbal, c_mktsegment, c_comment, c_nationkey " +
+        " from customer");
+    assertFalse(PlannerUtil.checkIfSimpleQuery(plan.getLogicalPlan()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql
new file mode 100644
index 0000000..ca43710
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/customer_ddl.sql
@@ -0,0 +1,9 @@
+CREATE TABLE customer_parts (
+  c_custkey    INT4,
+  c_name    TEXT,
+  c_address    TEXT,
+  c_phone    TEXT,
+  c_acctbal    FLOAT8,
+  c_mktsegment    TEXT,
+  c_comment    TEXT
+) PARTITION BY COLUMN (c_nationkey INT4);
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql
new file mode 100644
index 0000000..8767ba4
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/insert_into_customer.sql
@@ -0,0 +1,11 @@
+INSERT INTO customer_parts
+  SELECT
+    c_custkey,
+    c_name,
+    c_address,
+    c_phone,
+    c_acctbal,
+    c_mktsegment,
+    c_comment,
+    c_nationkey
+  FROM customer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql
new file mode 100644
index 0000000..42362b6
--- /dev/null
+++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.sql
@@ -0,0 +1 @@
+select * from customer_parts limit 10;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result b/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result
new file mode 100644
index 0000000..5704ccb
--- /dev/null
+++ b/tajo-core/src/test/resources/results/TestSelectQuery/testSimpleQueryWithLimitPartitionedTable.result
@@ -0,0 +1,12 @@
+c_custkey,c_name,c_address,c_phone,c_acctbal,c_mktsegment,c_comment,c_nationkey
+-------------------------------
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+3,Customer#000000003,MG9kdTD2WBHm,11-719-748-3364,7498.12,AUTOMOBILE, deposits eat slyly ironic, even instructions. express foxes detect slyly. blithely even accounts abov,1
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
+2,Customer#000000002,XSTf4,NCwDVaWNe6tEgvwfmRchLXak,23-768-687-3665,121.65,AUTOMOBILE,l accounts. blithely ironic theodolites integrate boldly: caref,13
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/7510f886/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index 0b5e28f..e6b12b1 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -34,4 +34,5 @@ Available Session Variables:
 \set NULL_CHAR [text value] - null char of text file output
 \set CODEGEN [true or false] - Runtime code generation enabled (experiment)
 \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
+\set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
 \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled
\ No newline at end of file