You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/06/23 06:47:39 UTC

tajo git commit: TAJO-751: JDBC driver should support cancel() method.

Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.2 3f3f3b7e6 -> 9accd2e49


TAJO-751: JDBC driver should support cancel() method.

Signed-off-by: Jihoon Son <ji...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/9accd2e4
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/9accd2e4
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/9accd2e4

Branch: refs/heads/branch-0.10.2
Commit: 9accd2e49cc6b778ce96a146be4c78ea2fc5f4d3
Parents: 3f3f3b7
Author: navis.ryu <na...@apache.org>
Authored: Tue Jun 23 13:46:34 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Tue Jun 23 13:46:34 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../org/apache/tajo/client/QueryClientImpl.java |  12 +-
 .../apache/tajo/client/SessionConnection.java   |  14 +-
 .../java/org/apache/tajo/client/TajoClient.java |   3 +
 .../org/apache/tajo/client/TajoClientUtil.java  |  21 +-
 .../org/apache/tajo/jdbc/FetchResultSet.java    |  15 +-
 .../apache/tajo/jdbc/TajoMemoryResultSet.java   |  10 +-
 .../org/apache/tajo/jdbc/TajoResultSetBase.java |  19 +-
 .../org/apache/tajo/jdbc/WaitingResultSet.java  |  71 +++++
 .../java/org/apache/tajo/OverridableConf.java   |   4 +
 .../src/main/java/org/apache/tajo/QueryId.java  |   4 +
 .../main/java/org/apache/tajo/SessionVars.java  |   4 +-
 .../java/org/apache/tajo/conf/TajoConf.java     |   1 +
 .../java/org/apache/tajo/util/KeyValueSet.java  |  59 +++-
 .../org/apache/tajo/master/QueryInProgress.java |  10 +-
 .../tajo/master/TajoMasterClientService.java    |   6 +
 .../apache/tajo/querymaster/Repartitioner.java  |   4 +-
 .../java/org/apache/tajo/jdbc/TestTajoJdbc.java |  37 ++-
 .../TestTajoCli/testHelpSessionVars.result      |   1 +
 .../org/apache/tajo/jdbc/JdbcConnection.java    |   6 +-
 .../apache/tajo/jdbc/TajoMetaDataResultSet.java |   9 +-
 .../apache/tajo/jdbc/TajoPreparedStatement.java | 276 ++-----------------
 .../org/apache/tajo/jdbc/TajoStatement.java     | 199 ++++++++-----
 23 files changed, 394 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index dc9e32e..e13413a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -6,6 +6,9 @@ Release 0.10.2 - Released
 
   IMPROVEMENT
 
+    TAJO-751: JDBC driver should support cancel() method.
+    (Contributed by navis, Committed by jihoon)
+
   BUG FIXES
 
   TASKS

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 99c58b6..75d2d3b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -271,17 +271,7 @@ public class QueryClientImpl implements QueryClient {
       return createNullResultSet(queryId);
     }
 
-    QueryStatus status = getQueryStatus(queryId);
-
-    while(status != null && !TajoClientUtil.isQueryComplete(status.getState())) {
-      try {
-        Thread.sleep(500);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-      }
-
-      status = getQueryStatus(queryId);
-    }
+    QueryStatus status = TajoClientUtil.waitCompletion(this, queryId);
 
     if (status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
       if (status.hasResult()) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 187af33..63edc6b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoIdProtos;
+import org.apache.tajo.annotation.NotNull;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.auth.UserRoleInfo;
 import org.apache.tajo.ipc.ClientProtos;
@@ -68,14 +69,14 @@ public class SessionConnection implements Closeable {
 
   volatile TajoIdProtos.SessionIdProto sessionId;
 
-  private AtomicBoolean closed = new AtomicBoolean(false);
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   /** session variable cache */
   private final Map<String, String> sessionVarsCache = new HashMap<String, String>();
 
-  private ServiceTracker serviceTracker;
+  private final ServiceTracker serviceTracker;
 
-  private KeyValueSet properties;
+  private final KeyValueSet properties;
 
   /**
    * Connect to TajoMaster
@@ -88,14 +89,13 @@ public class SessionConnection implements Closeable {
    */
   public SessionConnection(ServiceTracker tracker, @Nullable String baseDatabase,
                            KeyValueSet properties) throws IOException {
-
+    this.serviceTracker = tracker;
+    this.baseDatabase = baseDatabase;
     this.properties = properties;
 
     this.manager = RpcClientManager.getInstance();
     this.userInfo = UserRoleInfo.getCurrentUser();
-    this.baseDatabase = baseDatabase != null ? baseDatabase : null;
 
-    this.serviceTracker = tracker;
     connections.incrementAndGet();
   }
 
@@ -113,7 +113,7 @@ public class SessionConnection implements Closeable {
     return manager.getClient(addr, protocolClass, asyncMode);
   }
 
-  protected KeyValueSet getProperties() {
+  public KeyValueSet getProperties() {
     return properties;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 376f63f..85929e8 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -19,9 +19,12 @@
 package org.apache.tajo.client;
 
 import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.util.KeyValueSet;
 
 import java.io.Closeable;
 
 @ThreadSafe
 public interface TajoClient extends QueryClient, CatalogAdminClient, Closeable {
+
+  KeyValueSet getProperties();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
index ea15aed..bab699e 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -18,7 +18,9 @@
 
 package org.apache.tajo.client;
 
+import com.google.protobuf.ServiceException;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.CatalogUtil;
@@ -27,6 +29,8 @@ import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.jdbc.FetchResultSet;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSetBase;
+import org.apache.tajo.rpc.RpcUtils;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 
 import java.io.IOException;
@@ -56,6 +60,21 @@ public class TajoClientUtil {
     return !isQueryWaitingForSchedule(state) && !isQueryRunning(state);
   }
 
+  public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) throws ServiceException {
+    QueryStatus status = client.getQueryStatus(queryId);
+
+    while(!isQueryComplete(status.getState())) {
+      try {
+        Thread.sleep(500);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      status = client.getQueryStatus(queryId);
+    }
+    return status;
+  }
+
   public static ResultSet createResultSet(TajoClient client, QueryId queryId,
                                           ClientProtos.GetQueryResultResponse response, int fetchRows)
       throws IOException {
@@ -91,7 +110,7 @@ public class TajoClientUtil {
   }
 
   public static ResultSet createNullResultSet() {
-    return new TajoMemoryResultSet(null, new Schema(), null, 0, null);
+    return new TajoMemoryResultSet(QueryIdFactory.NULL_QUERY_ID, new Schema(), null, 0, null);
   }
 
   public static ResultSet createNullResultSet(QueryId queryId) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index efe070e..869d7c4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -27,26 +27,19 @@ import java.io.IOException;
 import java.sql.SQLException;
 
 public class FetchResultSet extends TajoResultSetBase {
-  private QueryClient tajoClient;
-  private QueryId queryId;
+  protected QueryClient tajoClient;
   private int fetchRowNum;
   private TajoMemoryResultSet currentResultSet;
-  private boolean finished = false;
-// maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit.
+  private boolean finished;
+  // maxRows number is limit value of resultSet. The value must be >= 0, and 0 means there is not limit.
   private int maxRows;
 
   public FetchResultSet(QueryClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
-    super(tajoClient.getClientSideSessionVars());
+    super(queryId, schema, tajoClient.getClientSideSessionVars());
     this.tajoClient = tajoClient;
     this.maxRows = tajoClient.getMaxRows();
-    this.queryId = queryId;
     this.fetchRowNum = fetchRowNum;
     this.totalRow = Integer.MAX_VALUE;
-    this.schema = schema;
-  }
-
-  public QueryId getQueryId() {
-    return queryId;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
index 33cb838..4114d03 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoMemoryResultSet.java
@@ -31,20 +31,16 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TajoMemoryResultSet extends TajoResultSetBase {
-  private QueryId queryId;
   private List<ByteString> serializedTuples;
   private AtomicBoolean closed = new AtomicBoolean(false);
   private RowStoreUtil.RowStoreDecoder decoder;
 
   public TajoMemoryResultSet(QueryId queryId, Schema schema, List<ByteString> serializedTuples, int maxRowNum,
                              Map<String, String> clientSideSessionVars) {
-    super(clientSideSessionVars);
-    this.queryId = queryId;
-    this.schema = schema;
+    super(queryId, schema, clientSideSessionVars);
     this.totalRow = maxRowNum;
     this.serializedTuples = serializedTuples;
     this.decoder = RowStoreUtil.createDecoder(schema);
-    init();
   }
 
   @Override
@@ -53,10 +49,6 @@ public class TajoMemoryResultSet extends TajoResultSetBase {
     curRow = 0;
   }
 
-  public QueryId getQueryId() {
-    return queryId;
-  }
-
   @Override
   public synchronized void close() throws SQLException {
     if (closed.getAndSet(true)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
index 77cbbf2..684d7ba 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSetBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.tajo.jdbc;
 
+import org.apache.tajo.QueryId;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes;
@@ -47,7 +48,11 @@ public abstract class TajoResultSetBase implements ResultSet {
   protected Schema schema;
   protected Tuple cur;
 
-  public TajoResultSetBase(@Nullable Map<String, String> clientSideSessionVars) {
+  protected final QueryId queryId;
+
+  public TajoResultSetBase(QueryId queryId, Schema schema, @Nullable Map<String, String> clientSideSessionVars) {
+    this.queryId = queryId;
+    this.schema = schema;
     this.clientSideSessionVars = clientSideSessionVars;
 
     if (clientSideSessionVars != null) {
@@ -73,6 +78,14 @@ public abstract class TajoResultSetBase implements ResultSet {
     wasNull = (d instanceof NullDatum);
   }
 
+  protected Schema getSchema() throws SQLException {
+    return schema;
+  }
+
+  public QueryId getQueryId() {
+    return queryId;
+  }
+
   public Tuple getCurrentTuple() {
     return cur;
   }
@@ -477,7 +490,7 @@ public abstract class TajoResultSetBase implements ResultSet {
 
   @Override
   public int findColumn(String colName) throws SQLException {
-    return schema.getColumnIdByName(colName);
+    return getSchema().getColumnIdByName(colName);
   }
 
   @Override
@@ -593,7 +606,7 @@ public abstract class TajoResultSetBase implements ResultSet {
 
   @Override
   public ResultSetMetaData getMetaData() throws SQLException {
-    return new TajoResultSetMetaData(schema);
+    return new TajoResultSetMetaData(getSchema());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java
new file mode 100644
index 0000000..b9f8df5
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/WaitingResultSet.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.jdbc;
+
+import com.google.protobuf.ServiceException;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryClient;
+import org.apache.tajo.client.QueryStatus;
+import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.ipc.ClientProtos;
+
+import java.sql.SQLException;
+
+/**
+ * Blocks on schema retrieval if it's not ready
+ */
+public class WaitingResultSet extends FetchResultSet {
+
+  public WaitingResultSet(QueryClient tajoClient, QueryId queryId, int fetchRowNum)
+      throws SQLException {
+    super(tajoClient, null, queryId, fetchRowNum);
+  }
+
+  @Override
+  public boolean next() throws SQLException {
+    getSchema();
+    return super.next();
+  }
+
+  @Override
+  protected Schema getSchema() throws SQLException {
+    return schema == null ? schema = waitOnResult() : schema;
+  }
+
+  private Schema waitOnResult() throws SQLException {
+    try {
+      QueryStatus status = TajoClientUtil.waitCompletion(tajoClient, queryId);
+
+      if (status.getState() != TajoProtos.QueryState.QUERY_SUCCEEDED) {
+        throw new ServiceException(status.getErrorMessage() != null ? status.getErrorMessage() :
+            status.getErrorTrace() != null ? status.getErrorTrace() :
+                "Failed to execute query by unknown reason");
+      }
+      ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(queryId);
+      TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc());
+      return tableDesc.getLogicalSchema();
+    } catch (ServiceException e) {
+      throw new SQLException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
index 61bbb5a..c22f054 100644
--- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
@@ -98,6 +98,7 @@ public class OverridableConf extends KeyValueSet {
     }
   }
 
+  @Override
   public boolean getBool(ConfigKey key) {
     return getBool(key, null);
   }
@@ -123,6 +124,7 @@ public class OverridableConf extends KeyValueSet {
     }
   }
 
+  @Override
   public int getInt(ConfigKey key) {
     return getInt(key, null);
   }
@@ -148,6 +150,7 @@ public class OverridableConf extends KeyValueSet {
     }
   }
 
+  @Override
   public long getLong(ConfigKey key) {
     return getLong(key, null);
   }
@@ -173,6 +176,7 @@ public class OverridableConf extends KeyValueSet {
     }
   }
 
+  @Override
   public float getFloat(ConfigKey key) {
     return getLong(key, null);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/QueryId.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryId.java b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
index 85882c1..35ca75c 100644
--- a/tajo-common/src/main/java/org/apache/tajo/QueryId.java
+++ b/tajo-common/src/main/java/org/apache/tajo/QueryId.java
@@ -44,6 +44,10 @@ public class QueryId implements Comparable<QueryId> {
     return seq;
   }
 
+  public boolean isNull() {
+    return this.equals(QueryIdFactory.NULL_QUERY_ID);
+  }
+
   @Override
   public String toString() {
     return QUERY_ID_PREFIX + SEPARATOR + toStringNoPrefix();

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
index 15ee73a..4882b27 100644
--- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
+++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java
@@ -102,7 +102,7 @@ public enum SessionVars implements ConfigKey {
   GROUPBY_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_GROUPBY_PARTITION_VOLUME, "shuffle output size for sort (mb)", DEFAULT,
       Integer.class, Validators.min("1")),
   TABLE_PARTITION_PER_SHUFFLE_SIZE(ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME,
-      "shuffle output size for partition table write (mb)", DEFAULT, Long.class, Validators.min("1")),
+      "shuffle output size for partition table write (mb)", DEFAULT, Integer.class, Validators.min("1")),
 
   GROUPBY_MULTI_LEVEL_ENABLED(ConfVars.$GROUPBY_MULTI_LEVEL_ENABLED, "Multiple level groupby enabled", DEFAULT,
       Boolean.class, Validators.bool()),
@@ -130,6 +130,8 @@ public enum SessionVars implements ConfigKey {
   // ResultSet ----------------------------------------------------------------
   FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT,
       Integer.class, Validators.min("0")),
+  BLOCK_ON_RESULT(ConfVars.$RESULT_SET_BLOCK_WAIT, "Whether to block result set on query execution", DEFAULT,
+      Boolean.class, Validators.bool()),
 
   //-------------------------------------------------------------------------------
   // Only for Unit Testing

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 1cc1240..95ef4bc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -361,6 +361,7 @@ public class TajoConf extends Configuration {
 
     // ResultSet ---------------------------------------------------------
     $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200),
+    $RESULT_SET_BLOCK_WAIT("tajo.resultset.block.wait", true),
     ;
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 6af0c9e..392552d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -21,7 +21,10 @@ package org.apache.tajo.util;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
+import org.apache.tajo.ConfigKey;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.common.ProtoObject;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.json.CommonGsonHelper;
 import org.apache.tajo.json.GsonObject;
 
@@ -37,7 +40,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
   public static final String FALSE_STR = "false";
 
   @Expose private Map<String,String> keyVals;
-	
+
 	public KeyValueSet() {
     keyVals = TUtil.newHashMap();
 	}
@@ -46,23 +49,23 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
     this();
     putAll(keyVals);
   }
-	
+
 	public KeyValueSet(KeyValueSetProto proto) {
     this.keyVals = TUtil.newHashMap();
     for(KeyValueProto keyval : proto.getKeyvalList()) {
       this.keyVals.put(keyval.getKey(), keyval.getValue());
     }
 	}
-	
+
 	public KeyValueSet(KeyValueSet keyValueSet) {
 	  this();
 	  this.keyVals.putAll(keyValueSet.keyVals);
 	}
-	
+
 	public static KeyValueSet create() {
 	  return new KeyValueSet();
 	}
-	
+
 	public static KeyValueSet create(KeyValueSet keyValueSet) {
     return new KeyValueSet(keyValueSet);
   }
@@ -119,7 +122,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
   public boolean getBool(String key, Boolean defaultVal) {
     if (containsKey(key)) {
       String strVal = get(key, null);
-      return strVal != null ? strVal.equalsIgnoreCase(TRUE_STR) : false;
+      return strVal != null && strVal.equalsIgnoreCase(TRUE_STR);
     } else if (defaultVal != null) {
       return defaultVal;
     } else {
@@ -131,6 +134,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
     return getBool(key, null);
   }
 
+  public boolean getBool(ConfigKey key) {
+    String keyName = key.keyname();
+    if (key instanceof SessionVars) {
+      return getBool(keyName, ((SessionVars)key).getConfVars().defaultBoolVal);
+    } else if (key instanceof TajoConf.ConfVars) {
+      return getBool(keyName, ((TajoConf.ConfVars)key).defaultBoolVal);
+    }
+    return getBool(keyName);
+  }
+
   public void setInt(String key, int val) {
     set(key, String.valueOf(val));
   }
@@ -150,6 +163,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
     return getInt(key, null);
   }
 
+  public int getInt(ConfigKey key) {
+    String keyName = key.keyname();
+    if (key instanceof SessionVars) {
+      return getInt(keyName, ((SessionVars) key).getConfVars().defaultIntVal);
+    } else if (key instanceof TajoConf.ConfVars) {
+      return getInt(keyName, ((TajoConf.ConfVars) key).defaultIntVal);
+    }
+    return getInt(keyName);
+  }
+
   public void setLong(String key, long val) {
     set(key, String.valueOf(val));
   }
@@ -169,6 +192,16 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
     return getLong(key, null);
   }
 
+  public long getLong(ConfigKey key) {
+    String keyName = key.keyname();
+    if (key instanceof SessionVars) {
+      return getLong(keyName, ((SessionVars) key).getConfVars().defaultLongVal);
+    } else if (key instanceof TajoConf.ConfVars) {
+      return getLong(keyName, ((TajoConf.ConfVars) key).defaultLongVal);
+    }
+    return getLong(keyName);
+  }
+
   public void setFloat(String key, float val) {
     set(key, String.valueOf(val));
   }
@@ -185,7 +218,7 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
         throw new IllegalArgumentException("No such a config key: "  + key);
       }
     } else if (defaultVal != null) {
-      return defaultVal.floatValue();
+      return defaultVal;
     } else {
       throw new IllegalArgumentException("No such a config key: "  + key);
     }
@@ -194,7 +227,17 @@ public class KeyValueSet implements ProtoObject<KeyValueSetProto>, Cloneable, Gs
   public float getFloat(String key) {
     return getFloat(key, null);
   }
-	
+
+  public float getFloat(ConfigKey key) {
+    String keyName = key.keyname();
+    if (key instanceof SessionVars) {
+      return getFloat(keyName, ((SessionVars) key).getConfVars().defaultFloatVal);
+    } else if (key instanceof TajoConf.ConfVars) {
+      return getFloat(keyName, ((TajoConf.ConfVars) key).defaultFloatVal);
+    }
+    return getFloat(keyName);
+  }
+
 	public String remove(String key) {
 		return keyVals.remove(key);
 	}

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
index d2286cf..8cd9d80 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java
@@ -246,7 +246,7 @@ public class QueryInProgress {
 
       // terminal state will let client to retrieve a query result
       // So, we must set the query result before changing query state
-      if (isFinishState(this.queryInfo.getQueryState())) {
+      if (isFinishState()) {
         if (queryInfo.hasResultdesc()) {
           this.queryInfo.setResultDesc(queryInfo.getResultDesc());
         }
@@ -259,7 +259,13 @@ public class QueryInProgress {
     }
   }
 
-  private boolean isFinishState(TajoProtos.QueryState state) {
+  public boolean isKillWait() {
+    TajoProtos.QueryState state = queryInfo.getQueryState();
+    return state == TajoProtos.QueryState.QUERY_KILL_WAIT;
+  }
+
+  public boolean isFinishState() {
+    TajoProtos.QueryState state = queryInfo.getQueryState();
     return state == TajoProtos.QueryState.QUERY_FAILED ||
         state == TajoProtos.QueryState.QUERY_ERROR ||
         state == TajoProtos.QueryState.QUERY_KILLED ||

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
index 418c30b..9a71d2c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java
@@ -623,6 +623,12 @@ public class TajoMasterClientService extends AbstractService {
       try {
         context.getSessionManager().touch(request.getSessionId().getId());
         QueryId queryId = new QueryId(request.getQueryId());
+
+        QueryInProgress progress = context.getQueryJobManager().getQueryInProgress(queryId);
+        if (progress == null || progress.isFinishState() || progress.isKillWait()) {
+          return BOOL_TRUE;
+        }
+
         QueryManager queryManager = context.getQueryJobManager();
         queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL,
             new QueryInfo(queryId)));

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
index 8e9e343..21ab409 100644
--- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java
@@ -992,8 +992,8 @@ public class Repartitioner {
   public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext,
        Stage stage, Map<ExecutionBlockId, List<IntermediateEntry>> intermediates,
        String tableName) {
-    long splitVolume = StorageUnit.MB *
-        stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
+    long splitVolume = (long)StorageUnit.MB *
+        stage.getMasterPlan().getContext().getInt(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE);
     long pageSize = ((long)StorageUnit.MB) *
         stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes
     if (pageSize >= splitVolume) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index 36bbd94..b9371ef 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -19,13 +19,12 @@
 package org.apache.tajo.jdbc;
 
 import com.google.common.collect.Maps;
-import org.apache.tajo.IntegrationTest;
-import org.apache.tajo.QueryTestCaseBase;
-import org.apache.tajo.TajoConstants;
+import org.apache.tajo.*;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.client.QueryClient;
+import org.apache.tajo.client.QueryStatus;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -33,10 +32,7 @@ import org.junit.experimental.categories.Category;
 
 import java.net.InetSocketAddress;
 import java.sql.*;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 
 import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.junit.Assert.*;
@@ -673,4 +669,31 @@ public class TestTajoJdbc extends QueryTestCaseBase {
       }
     }
   }
+
+  @Test
+  public final void testCancel() throws Exception {
+    String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(),
+        DEFAULT_DATABASE_NAME);
+    Properties props = new Properties();
+    props.setProperty(SessionVars.BLOCK_ON_RESULT.keyname(), "false");
+
+    Connection conn = new JdbcConnection(connUri, props);
+    PreparedStatement statement = conn.prepareStatement("select sleep(1) from lineitem");
+    try {
+      assertTrue("should have result set", statement.execute());
+      TajoResultSetBase result = (TajoResultSetBase) statement.getResultSet();
+      Thread.sleep(1000);   // todo query master is not killed properly if it's compiling the query (use 100, if you want see)
+      statement.cancel();
+
+      QueryStatus status = client.getQueryStatus(result.getQueryId());
+      assertEquals(TajoProtos.QueryState.QUERY_KILLED, status.getState());
+    } finally {
+      if (statement != null) {
+        statement.close();
+      }
+      if (conn != null) {
+        conn.close();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
index bcd8970..19b2ee1 100644
--- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
+++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result
@@ -36,4 +36,5 @@ Available Session Variables:
 \set CODEGEN [true or false] - Runtime code generation enabled (experiment)
 \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs.
 \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master
+\set BLOCK_ON_RESULT [true or false] - Whether to block result set on query execution
 \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
index 85b6af3..e5f2170 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/JdbcConnection.java
@@ -21,11 +21,13 @@ package org.apache.tajo.jdbc;
 import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.client.CatalogAdminClient;
 import org.apache.tajo.client.QueryClient;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.jdbc.util.QueryStringDecoder;
 import org.apache.tajo.rpc.RpcUtils;
 import org.apache.tajo.util.KeyValueSet;
@@ -53,6 +55,8 @@ public class JdbcConnection implements Connection {
   /** it will be used soon. */
   private final Map<String, List<String>> params;
 
+  private final KeyValueSet clientProperties;
+
   public JdbcConnection(String rawURI, Properties properties) throws SQLException {
     this.rawURI = rawURI;
     this.properties = properties;
@@ -99,7 +103,7 @@ public class JdbcConnection implements Connection {
       throw new SQLException("Invalid JDBC URI: " + rawURI, "TAJO-001");
     }
 
-    KeyValueSet clientProperties = new KeyValueSet();
+    clientProperties = new KeyValueSet();
     if(properties != null) {
       for(Map.Entry<Object, Object> entry: properties.entrySet()) {
         clientProperties.set(entry.getKey().toString(), entry.getValue().toString());

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
index 8f5bed6..1813377 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoMetaDataResultSet.java
@@ -31,17 +31,12 @@ public class TajoMetaDataResultSet extends TajoResultSetBase {
   private List<MetaDataTuple> values;
 
   public TajoMetaDataResultSet(Schema schema, List<MetaDataTuple> values) {
-    super(null);
-    init();
-    this.schema = schema;
+    super(null, schema, null);
     setDataTuples(values);
   }
 
   public TajoMetaDataResultSet(List<String> columns, List<Type> types, List<MetaDataTuple> values) {
-    super(null);
-    init();
-    schema = new Schema();
-
+    super(null, new Schema(), null);
     int index = 0;
     if(columns != null) {
       for(String columnName: columns) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
index 229587a..0574bf9 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoPreparedStatement.java
@@ -30,35 +30,16 @@ import java.util.HashMap;
  * TajoPreparedStatement.
  *
  */
-public class TajoPreparedStatement implements PreparedStatement {
-  private JdbcConnection conn;
+public class TajoPreparedStatement extends TajoStatement implements PreparedStatement {
+
   private final String sql;
-  private TajoClient tajoClient;
+
   /**
    * save the SQL parameters {paramLoc:paramValue}
    */
   private final HashMap<Integer, String> parameters=new HashMap<Integer, String>();
 
   /**
-   * We need to keep a reference to the result set to support the following:
-   * <code>
-   * statement.execute(String sql);
-   * statement.getResultSet();
-   * </code>.
-   */
-  private ResultSet resultSet = null;
-
-  /**
-   * Add SQLWarnings to the warningChain if needed.
-   */
-  //private  SQLWarning warningChain = null;
-
-  /**
-   * Keep state so we can fail certain calls made after close().
-   */
-  private boolean isClosed = false;
-
-  /**
    * keep the current ResultRet update count
    */
   private int updateCount = 0;
@@ -69,8 +50,7 @@ public class TajoPreparedStatement implements PreparedStatement {
   public TajoPreparedStatement(JdbcConnection conn,
                                TajoClient tajoClient,
                                String sql) {
-    this.conn = conn;
-    this.tajoClient = tajoClient;
+    super(conn, tajoClient);
     this.sql = sql;
   }
 
@@ -81,6 +61,7 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public void clearParameters() throws SQLException {
+    checkConnection("Can't clear parameters");
     this.parameters.clear();
   }
 
@@ -101,22 +82,14 @@ public class TajoPreparedStatement implements PreparedStatement {
     return updateCount;
   }
 
-  protected ResultSet executeImmediate(String sql) throws SQLException {
-    if (isClosed) {
-      throw new SQLException("Can't execute after statement has been closed");
-    }
+  protected TajoResultSetBase executeImmediate(String sql) throws SQLException {
+    checkConnection("Can't execute");
 
     try {
       if (sql.contains("?")) {
         sql = updateSql(sql, parameters);
       }
-      if (TajoStatement.isSetVariableQuery(sql)) {
-        return TajoStatement.setSessionVariable(tajoClient, sql);
-      } else if (TajoStatement.isUnSetVariableQuery(sql)) {
-        return TajoStatement.unSetSessionVariable(tajoClient, sql);
-      } else {
-        return tajoClient.executeQueryAndGetResult(sql);
-      }
+      return (TajoResultSetBase) executeSQL(sql);
     } catch (Exception e) {
       throw new SQLException(e.getMessage(), e);
     }
@@ -131,7 +104,7 @@ public class TajoPreparedStatement implements PreparedStatement {
    */
   private String updateSql(final String sql, HashMap<Integer, String> parameters) {
 
-    StringBuffer newSql = new StringBuffer(sql);
+    StringBuilder newSql = new StringBuilder(sql);
 
     int paramLoc = 1;
     while (getCharIndexFromSqlByParamLocation(sql, '?', paramLoc) > 0) {
@@ -179,6 +152,7 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public ResultSetMetaData getMetaData() throws SQLException {
+    checkConnection("Can't get metadata");
     if(resultSet != null) {
       return resultSet.getMetaData();
     } else {
@@ -249,6 +223,7 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public void setBoolean(int parameterIndex, boolean x) throws SQLException {
+    checkConnection("Can't set parameters");
     this.parameters.put(parameterIndex, "" + x);
   }
 
@@ -306,21 +281,25 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public void setDouble(int parameterIndex, double x) throws SQLException {
+    checkConnection("Can't set parameters");
     this.parameters.put(parameterIndex,"" + x);
   }
 
   @Override
   public void setFloat(int parameterIndex, float x) throws SQLException {
+    checkConnection("Can't set parameters");
     this.parameters.put(parameterIndex,"" + x);
   }
 
   @Override
   public void setInt(int parameterIndex, int x) throws SQLException {
+    checkConnection("Can't set parameters");
     this.parameters.put(parameterIndex,"" + x);
   }
 
   @Override
   public void setLong(int parameterIndex, long x) throws SQLException {
+    checkConnection("Can't set parameters");
     this.parameters.put(parameterIndex,"" + x);
   }
 
@@ -399,11 +378,13 @@ public class TajoPreparedStatement implements PreparedStatement {
 
   @Override
   public void setShort(int parameterIndex, short x) throws SQLException {
+    checkConnection("Can't set parameters");
     this.parameters.put(parameterIndex,"" + x);
   }
 
   @Override
   public void setString(int parameterIndex, String x) throws SQLException {
+    checkConnection("Can't set parameters");
      x=x.replace("'", "\\'");
      this.parameters.put(parameterIndex,"'" + x +"'");
   }
@@ -439,227 +420,4 @@ public class TajoPreparedStatement implements PreparedStatement {
       throws SQLException {
     throw new SQLFeatureNotSupportedException("setUnicodeStream not supported");
   }
-
-  @Override
-  public void addBatch(String sql) throws SQLException {
-    throw new SQLFeatureNotSupportedException("addBatch not supported");
-  }
-
-  @Override
-  public void cancel() throws SQLException {
-    throw new SQLFeatureNotSupportedException("cancel not supported");
-  }
-
-  @Override
-  public void clearBatch() throws SQLException {
-    throw new SQLFeatureNotSupportedException("clearBatch not supported");
-  }
-
-  @Override
-  public void clearWarnings() throws SQLException {
-  }
-
-  public void closeOnCompletion() throws SQLException {
-    // JDK 1.7
-    throw new SQLFeatureNotSupportedException("closeOnCompletion");
-  }
-
-  @Override
-  public void close() throws SQLException {
-    if (resultSet!=null) {
-      resultSet.close();
-      resultSet = null;
-    }
-    isClosed = true;
-  }
-
-  @Override
-  public boolean execute(String sql) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
-  }
-
-  @Override
-  public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
-  }
-
-  @Override
-  public boolean execute(String sql, int[] columnIndexes) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
-  }
-
-  @Override
-  public boolean execute(String sql, String[] columnNames) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute(sql) not supported");
-  }
-
-  @Override
-  public int[] executeBatch() throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeBatch not supported");
-  }
-
-  @Override
-  public ResultSet executeQuery(String sql) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeQuery(sql) not supported");
-  }
-
-  @Override
-  public int executeUpdate(String sql) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
-  }
-
-  @Override
-  public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
-  }
-
-  @Override
-  public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
-  }
-
-  @Override
-  public int executeUpdate(String sql, String[] columnNames) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
-  }
-
-  @Override
-  public Connection getConnection() throws SQLException {
-    return conn;
-  }
-
-  @Override
-  public int getFetchDirection() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getFetchDirection not supported");
-  }
-
-  @Override
-  public int getFetchSize() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getFetchSize not supported");
-  }
-
-  @Override
-  public ResultSet getGeneratedKeys() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported");
-  }
-
-  @Override
-  public int getMaxFieldSize() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported");
-  }
-
-  @Override
-  public int getMaxRows() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMaxRows not supported");
-  }
-
-  @Override
-  public boolean getMoreResults() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
-  }
-
-  @Override
-  public boolean getMoreResults(int current) throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
-  }
-
-  @Override
-  public int getQueryTimeout() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getQueryTimeout not supported");
-  }
-
-  @Override
-  public ResultSet getResultSet() throws SQLException {
-    return this.resultSet;
-  }
-
-  @Override
-  public int getResultSetConcurrency() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported");
-  }
-
-  @Override
-  public int getResultSetHoldability() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported");
-  }
-
-  @Override
-  public int getResultSetType() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getResultSetType not supported");
-  }
-
-  @Override
-  public int getUpdateCount() throws SQLException {
-    return updateCount;
-  }
-
-  @Override
-  public SQLWarning getWarnings() throws SQLException {
-    return null;
-  }
-
-  @Override
-  public boolean isClosed() throws SQLException {
-    return isClosed;
-  }
-
-   public boolean isCloseOnCompletion() throws SQLException {
-     //JDK 1.7
-     throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported");
-   }
-
-  @Override
-  public boolean isPoolable() throws SQLException {
-    throw new SQLFeatureNotSupportedException("isPoolable not supported");
-  }
-
-  @Override
-  public void setCursorName(String name) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setCursorName not supported");
-  }
-
-  @Override
-  public void setEscapeProcessing(boolean enable) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setEscapeProcessing not supported");
-  }
-
-  @Override
-  public void setFetchDirection(int direction) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
-  }
-
-  @Override
-  public void setFetchSize(int rows) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setFetchSize not supported");
-  }
-
-  @Override
-  public void setMaxFieldSize(int max) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported");
-  }
-
-  @Override
-  public void setMaxRows(int max) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setMaxRows not supported");
-  }
-
-  @Override
-  public void setPoolable(boolean poolable) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setPoolable not supported");
-  }
-
-  @Override
-  public void setQueryTimeout(int seconds) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setQueryTimeout not supported");
-  }
-
-  @Override
-  public boolean isWrapperFor(Class<?> iface) throws SQLException {
-    throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
-  }
-
-  @Override
-  public <T> T unwrap(Class<T> iface) throws SQLException {
-    throw new SQLFeatureNotSupportedException("unwrap not supported");
-  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/9accd2e4/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
----------------------------------------------------------------------
diff --git a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
index 820e350..0f80ddf 100644
--- a/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
+++ b/tajo-jdbc/src/main/java/org/apache/tajo/jdbc/TajoStatement.java
@@ -19,17 +19,21 @@ package org.apache.tajo.jdbc;
 
 import com.google.common.collect.Lists;
 import com.google.protobuf.ServiceException;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientUtil;
+import org.apache.tajo.ipc.ClientProtos;
 
+import java.io.IOException;
 import java.sql.*;
 import java.util.HashMap;
 import java.util.Map;
 
 public class TajoStatement implements Statement {
-  private JdbcConnection conn;
-  private TajoClient tajoClient;
-  private int fetchSize = 200;
+  protected JdbcConnection conn;
+  protected TajoClient tajoClient;
+  protected int fetchSize = SessionVars.FETCH_ROWNUM.getConfVars().defaultIntVal;
 
   /**
    * We need to keep a reference to the result set to support the following:
@@ -38,35 +42,65 @@ public class TajoStatement implements Statement {
    * statement.getResultSet();
    * </code>.
    */
-  private ResultSet resultSet = null;
+  protected TajoResultSetBase resultSet = null;
+
+  /**
+   * Add SQLWarnings to the warningChain if needed.
+   */
+  protected SQLWarning warningChain = null;
 
   /**
    * Keep state so we can fail certain calls made after close().
    */
-  private boolean isClosed = false;
+  private boolean isClosed;
+
+  private boolean blockWait;
 
   public TajoStatement(JdbcConnection conn, TajoClient tajoClient) {
     this.conn = conn;
     this.tajoClient = tajoClient;
+    this.blockWait = tajoClient.getProperties().getBool(SessionVars.BLOCK_ON_RESULT);
   }
 
+  /*
+   * NOTICE
+   *
+   * For unimplemented methods, this class throws an exception or prints an error message.
+   * If the unimplemented method can cause unexpected result to user application when it is called,
+   * it should throw an exception.
+   * Otherwise, it is enough that prints an error message.
+   */
+
   @Override
   public void addBatch(String sql) throws SQLException {
-    throw new SQLFeatureNotSupportedException("addBatch not supported");
+    throw new SQLFeatureNotSupportedException("addBatch() is not supported yet.");
   }
 
   @Override
   public void cancel() throws SQLException {
-    throw new SQLFeatureNotSupportedException("cancel not supported");
+    checkConnection("Can't cancel query");
+    if (resultSet == null || resultSet.getQueryId().isNull()) {
+      return;
+    }
+    try {
+      tajoClient.killQuery(resultSet.getQueryId());
+    } catch (Exception e) {
+      throw new SQLException(e);
+    } finally {
+      resultSet = null;
+    }
   }
 
   @Override
   public void clearBatch() throws SQLException {
-    throw new SQLFeatureNotSupportedException("clearBatch not supported");
+    throw new SQLFeatureNotSupportedException("clearBatch() is not supported yet.");
   }
 
   @Override
-  public void clearWarnings() throws SQLException {}
+  public void clearWarnings() throws SQLException {
+    checkConnection("Can't clear warnings");
+    warningChain = null;
+  }
 
   @Override
   public void close() throws SQLException {
@@ -79,55 +113,87 @@ public class TajoStatement implements Statement {
 
   public void closeOnCompletion() throws SQLException {
      // JDK 1.7
-     throw new SQLFeatureNotSupportedException("closeOnCompletion not supported");
+    throw new SQLFeatureNotSupportedException("closeOnCompletion() is not supported yet.");
   }
 
   @Override
   public boolean execute(String sql) throws SQLException {
-    resultSet = executeQuery(sql);
+    resultSet = (TajoResultSetBase) executeQuery(sql);
 
     return resultSet != null;
   }
 
   @Override
   public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute not supported");
+    throw new SQLFeatureNotSupportedException("execute() is not supported yet.");
   }
 
   @Override
   public boolean execute(String sql, int[] columnIndexes) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute not supported");
+    throw new SQLFeatureNotSupportedException("execute() is not supported yet.");
   }
 
   @Override
   public boolean execute(String sql, String[] columnNames) throws SQLException {
-    throw new SQLFeatureNotSupportedException("execute not supported");
+    throw new SQLFeatureNotSupportedException("execute() is not supported yet.");
   }
 
   @Override
   public int[] executeBatch() throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeBatch not supported");
+    throw new SQLFeatureNotSupportedException("executeBatch() is not supported yet.");
   }
 
   @Override
   public ResultSet executeQuery(String sql) throws SQLException {
-    if (isClosed) {
-      throw new SQLException("Can't execute after statement has been closed");
-    }
+    checkConnection("Can't execute");
 
     try {
-      if (isSetVariableQuery(sql)) {
-        return setSessionVariable(tajoClient, sql);
-      } else if (isUnSetVariableQuery(sql)) {
-        return unSetSessionVariable(tajoClient, sql);
-      } else {
-        return tajoClient.executeQueryAndGetResult(sql);
-      }
+      return executeSQL(sql);
     } catch (Exception e) {
       throw new SQLException(e.getMessage(), e);
     }
   }
 
+  protected ResultSet executeSQL(String sql) throws SQLException, ServiceException, IOException {
+    if (isSetVariableQuery(sql)) {
+      return setSessionVariable(tajoClient, sql);
+    }
+    if (isUnSetVariableQuery(sql)) {
+      return unSetSessionVariable(tajoClient, sql);
+    }
+
+    ClientProtos.SubmitQueryResponse response = tajoClient.executeQuery(sql);
+    if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+      if (response.hasErrorMessage()) {
+        throw new ServiceException(response.getErrorMessage());
+      }
+      if (response.hasErrorTrace()) {
+        throw new ServiceException(response.getErrorTrace());
+      }
+      throw new ServiceException("Failed to submit query by unknown reason");
+    }
+
+    QueryId queryId = new QueryId(response.getQueryId());
+    if (response.getIsForwarded() && !queryId.isNull()) {
+      WaitingResultSet result = new WaitingResultSet(tajoClient, queryId, fetchSize);
+      if (blockWait) {
+        result.getSchema();
+      }
+      return result;
+    }
+
+    if (response.hasResultSet() || response.hasTableDesc()) {
+      return TajoClientUtil.createResultSet(tajoClient, response, fetchSize);
+    }
+    return TajoClientUtil.createNullResultSet(queryId);
+  }
+
+  protected void checkConnection(String errorMsg) throws SQLException {
+    if (isClosed) {
+      throw new SQLException(errorMsg + " after statement has been closed");
+    }
+  }
+
   public static boolean isSetVariableQuery(String sql) {
     if (sql == null || sql.trim().isEmpty()) {
       return false;
@@ -144,7 +210,7 @@ public class TajoStatement implements Statement {
     return sql.trim().toLowerCase().startsWith("unset");
   }
 
-  public static ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException {
+  private ResultSet setSessionVariable(TajoClient client, String sql) throws SQLException {
     int index = sql.toLowerCase().indexOf("set");
     if (index < 0) {
       throw new SQLException("SET statement should be started 'SET' keyword: " + sql);
@@ -165,7 +231,7 @@ public class TajoStatement implements Statement {
     return TajoClientUtil.createNullResultSet();
   }
 
-  public static ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException {
+  private ResultSet unSetSessionVariable(TajoClient client, String sql) throws SQLException {
     int index = sql.toLowerCase().indexOf("unset");
     if (index < 0) {
       throw new SQLException("UNSET statement should be started 'UNSET' keyword: " + sql);
@@ -186,57 +252,57 @@ public class TajoStatement implements Statement {
 
   @Override
   public int executeUpdate(String sql) throws SQLException {
+    checkConnection("Can't execute update");
     try {
       tajoClient.executeQuery(sql);
 
       return 1;
     } catch (Exception ex) {
-      throw new SQLFeatureNotSupportedException(ex.toString());
+      throw new SQLException(ex.toString());
     }
   }
 
   @Override
   public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+    throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet.");
   }
 
   @Override
   public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+    throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet.");
   }
 
   @Override
   public int executeUpdate(String sql, String[] columnNames) throws SQLException {
-    throw new SQLFeatureNotSupportedException("executeUpdate not supported");
+    throw new SQLFeatureNotSupportedException("executeUpdate() is not supported yet.");
   }
 
   @Override
   public Connection getConnection() throws SQLException {
-    if (isClosed)
-      throw new SQLException("Can't get connection after statement has been closed");
+    checkConnection("Can't get connection");
     return conn;
   }
 
   @Override
   public int getFetchDirection() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getFetchDirection not supported");
+    checkConnection("Can't get fetch direction");
+    return ResultSet.FETCH_FORWARD;
   }
 
   @Override
   public int getFetchSize() throws SQLException {
-    if (isClosed)
-      throw new SQLException("Can't get fetch size after statement has been closed");
+    checkConnection("Can't get fetch size");
     return fetchSize;
   }
 
   @Override
   public ResultSet getGeneratedKeys() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getGeneratedKeys not supported");
+    throw new SQLFeatureNotSupportedException("getGeneratedKeys() is not supported yet.");
   }
 
   @Override
   public int getMaxFieldSize() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMaxFieldSize not supported");
+    throw new SQLFeatureNotSupportedException("getMaxFieldSize() is not supported yet.");
   }
 
   @Override
@@ -246,53 +312,54 @@ public class TajoStatement implements Statement {
 
   @Override
   public boolean getMoreResults() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+    throw new SQLFeatureNotSupportedException("getMoreResults() is not supported yet.");
   }
 
   @Override
   public boolean getMoreResults(int current) throws SQLException {
-    throw new SQLFeatureNotSupportedException("getMoreResults not supported");
+    throw new SQLFeatureNotSupportedException("getMoreResults() is not supported yet.");
   }
 
   @Override
   public int getQueryTimeout() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getQueryTimeout not supported");
+    System.err.println("getResultSetConcurrency() is not supported yet.");
+    return -1;
   }
 
   @Override
   public ResultSet getResultSet() throws SQLException {
-    if (isClosed)
-      throw new SQLException("Can't get result set after statement has been closed");
+    checkConnection("Can't get result set");
     return resultSet;
   }
 
   @Override
   public int getResultSetConcurrency() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getResultSetConcurrency not supported");
+    System.err.println("getResultSetConcurrency() is not supported yet.");
+    return -1;
   }
 
   @Override
   public int getResultSetHoldability() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getResultSetHoldability not supported");
+    System.err.println("getResultSetHoldability() is not supported yet.");
+    return -1;
   }
 
   @Override
   public int getResultSetType() throws SQLException {
-    throw new SQLFeatureNotSupportedException("getResultSetType not supported");
+    System.err.println("getResultSetType() is not supported yet.");
+    return -1;
   }
 
   @Override
   public int getUpdateCount() throws SQLException {
-    if (isClosed)
-      throw new SQLException("Can't get update count after statement has been closed");
-    return 0;
+    System.err.println("getResultSetType() is not supported yet.");
+    return -1;
   }
 
   @Override
   public SQLWarning getWarnings() throws SQLException {
-    if (isClosed)
-      throw new SQLException("Can't get warnings after statement has been closed");
-    return null;
+    checkConnection("Can't get warnings");
+    return warningChain;
   }
 
   @Override
@@ -302,40 +369,41 @@ public class TajoStatement implements Statement {
 
   public boolean isCloseOnCompletion() throws SQLException {
     // JDK 1.7
-    throw new SQLFeatureNotSupportedException("isCloseOnCompletion not supported");
+    throw new SQLFeatureNotSupportedException("isCloseOnCompletion() is not supported yet.");
   }
 
   @Override
   public boolean isPoolable() throws SQLException {
-    throw new SQLFeatureNotSupportedException("isPoolable not supported");
+    throw new SQLFeatureNotSupportedException("isPoolable() is not supported yet.");
   }
 
   @Override
   public void setCursorName(String name) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setCursorName not supported");
+    System.err.println("setCursorName() is not supported yet.");
   }
 
   /**
    * Not necessary.
    */
   @Override
-  public void setEscapeProcessing(boolean enable) throws SQLException {}
+  public void setEscapeProcessing(boolean enable) throws SQLException {
+    System.err.println("setEscapeProcessing() is not supported yet.");
+  }
 
   @Override
   public void setFetchDirection(int direction) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setFetchDirection not supported");
+    System.err.println("setFetchDirection() is not supported yet.");
   }
 
   @Override
   public void setFetchSize(int rows) throws SQLException {
-    if (isClosed)
-      throw new SQLException("Can't set fetch size after statement has been closed");
+    checkConnection("Can't set fetch size");
     fetchSize = rows;
   }
 
   @Override
   public void setMaxFieldSize(int max) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setMaxFieldSize not supported");
+    System.err.println("setMaxFieldSize() is not supported yet.");
   }
 
   @Override
@@ -348,22 +416,23 @@ public class TajoStatement implements Statement {
 
   @Override
   public void setPoolable(boolean poolable) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setPoolable not supported");
+    System.err.println("setPoolable() is not supported yet.");
   }
 
   @Override
   public void setQueryTimeout(int seconds) throws SQLException {
-    throw new SQLFeatureNotSupportedException("setQueryTimeout not supported");
+    System.err.println("setQueryTimeout() is not supported yet.");
   }
 
   @Override
   public boolean isWrapperFor(Class<?> iface) throws SQLException {
-    throw new SQLFeatureNotSupportedException("isWrapperFor not supported");
+    System.err.println("isWrapperFor() is not supported yet.");
+    return false;
   }
 
   @Override
   public <T> T unwrap(Class<T> iface) throws SQLException {
-    throw new SQLFeatureNotSupportedException("unwrap not supported");
+    System.err.println("unwrap() is not supported yet.");
+    return null;
   }
-
 }