You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2015/08/04 03:31:28 UTC

[3/4] tajo git commit: TAJO-1699: Tajo Java Client version 2.

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index 4900188..a97cb33 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@ -26,6 +26,9 @@ import org.apache.tajo.TajoIdProtos;
 import org.apache.tajo.annotation.NotNull;
 import org.apache.tajo.annotation.Nullable;
 import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.client.v2.exception.ClientConnectionException;
+import org.apache.tajo.exception.NoSuchSessionVariableException;
 import org.apache.tajo.exception.SQLExceptionUtil;
 import org.apache.tajo.ipc.ClientProtos;
 import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse;
@@ -36,7 +39,6 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.rpc.RpcClientManager;
 import org.apache.tajo.rpc.RpcConstants;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetResponse;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringResponse;
@@ -57,14 +59,12 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.tajo.error.Errors.ResultCode.NO_SUCH_SESSION_VARIABLE;
-import static org.apache.tajo.exception.ReturnStateUtil.isError;
-import static org.apache.tajo.exception.ReturnStateUtil.isSuccess;
-import static org.apache.tajo.exception.ReturnStateUtil.isThisError;
-import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException;
+import static org.apache.tajo.error.Errors.ResultCode.UNDEFINED_DATABASE;
+import static org.apache.tajo.exception.ReturnStateUtil.*;
 import static org.apache.tajo.exception.SQLExceptionUtil.throwIfError;
+import static org.apache.tajo.exception.SQLExceptionUtil.toSQLException;
 import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest;
 import static org.apache.tajo.ipc.ClientProtos.CreateSessionResponse;
-import static org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
 
 public class SessionConnection implements Closeable {
 
@@ -101,7 +101,7 @@ public class SessionConnection implements Closeable {
    * @throws SQLException
    */
   public SessionConnection(@NotNull ServiceTracker tracker, @Nullable String baseDatabase,
-                           @NotNull KeyValueSet properties) throws SQLException {
+                           @NotNull KeyValueSet properties) {
     this.serviceTracker = tracker;
     this.baseDatabase = baseDatabase;
     this.properties = properties;
@@ -117,7 +117,7 @@ public class SessionConnection implements Closeable {
     return Collections.unmodifiableMap(sessionVarsCache);
   }
 
-  public synchronized NettyClientBase getTajoMasterConnection() throws SQLException {
+  public synchronized NettyClientBase getTajoMasterConnection() {
 
     if (client != null && client.isConnected()) {
       return client;
@@ -138,14 +138,14 @@ public class SessionConnection implements Closeable {
         connections.incrementAndGet();
 
       } catch (Throwable t) {
-        throw SQLExceptionUtil.makeUnableToEstablishConnection(t);
+        throw new ClientConnectionException(t);
       }
 
       return client;
     }
   }
 
-  protected BlockingInterface getTMStub() throws SQLException {
+  protected BlockingInterface getTMStub() {
     NettyClientBase tmClient;
     tmClient = getTajoMasterConnection();
     BlockingInterface stub = tmClient.getStub();
@@ -185,7 +185,7 @@ public class SessionConnection implements Closeable {
     return userInfo;
   }
 
-  public String getCurrentDatabase() throws SQLException {
+  public String getCurrentDatabase() {
     NettyClientBase client = getTajoMasterConnection();
     checkSessionAndGet(client);
 
@@ -198,11 +198,11 @@ public class SessionConnection implements Closeable {
       throw new RuntimeException(e);
     }
 
-    throwIfError(response.getState());
+    ensureOk(response.getState());
     return response.getValue();
   }
 
-  public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws SQLException {
+  public Map<String, String> updateSessionVariables(final Map<String, String> variables) {
     NettyClientBase client = getTajoMasterConnection();
     checkSessionAndGet(client);
 
@@ -221,15 +221,12 @@ public class SessionConnection implements Closeable {
       throw new RuntimeException(e);
     }
 
-    if (isSuccess(response.getState())) {
-      updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-      return Collections.unmodifiableMap(sessionVarsCache);
-    } else {
-      throw toSQLException(response.getState());
-    }
+    ensureOk(response.getState());
+    updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+    return Collections.unmodifiableMap(sessionVarsCache);
   }
 
-  public Map<String, String> unsetSessionVariables(final List<String> variables) throws SQLException {
+  public Map<String, String> unsetSessionVariables(final List<String> variables) throws NoSuchSessionVariableException {
 
     final BlockingInterface stub = getTMStub();
     final UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder()
@@ -244,12 +241,13 @@ public class SessionConnection implements Closeable {
       throw new RuntimeException(e);
     }
 
-    if (isSuccess(response.getState())) {
-      updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
-      return Collections.unmodifiableMap(sessionVarsCache);
-    } else {
-      throw toSQLException(response.getState());
+    if (isThisError(response.getState(), NO_SUCH_SESSION_VARIABLE)) {
+      throw new NoSuchSessionVariableException(response.getState());
     }
+
+    ensureOk(response.getState());
+    updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+    return Collections.unmodifiableMap(sessionVarsCache);
   }
 
   void updateSessionVarsCache(Map<String, String> variables) {
@@ -259,7 +257,7 @@ public class SessionConnection implements Closeable {
     }
   }
 
-  public String getSessionVariable(final String varname) throws SQLException {
+  public String getSessionVariable(final String varname) throws NoSuchSessionVariableException {
     synchronized (sessionVarsCache) {
       // If a desired variable is client side one and exists in the cache, immediately return the variable.
       if (sessionVarsCache.containsKey(varname)) {
@@ -271,35 +269,41 @@ public class SessionConnection implements Closeable {
     checkSessionAndGet(client);
 
     BlockingInterface stub = client.getStub();
-
+    StringResponse response;
     try {
-      return stub.getSessionVariable(null, getSessionedString(varname)).getValue();
+      response = stub.getSessionVariable(null, getSessionedString(varname));
 
     } catch (ServiceException e) {
       throw new RuntimeException(e);
     }
+
+    if (isThisError(response.getState(), NO_SUCH_SESSION_VARIABLE)) {
+      throw new NoSuchSessionVariableException(response.getState());
+    }
+
+    ensureOk(response.getState());
+    return response.getValue();
   }
 
-  public Boolean existSessionVariable(final String varname) throws SQLException {
+  public Boolean existSessionVariable(final String varname) {
 
+    ReturnState state;
     try {
       final BlockingInterface stub = getTMStub();
-      ReturnState state = stub.existSessionVariable(null, getSessionedString(varname));
-
-      if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) {
-        return false;
-      } else if (isError(state)){
-        throw SQLExceptionUtil.toSQLException(state);
-      }
-
-      return isSuccess(state);
-
+      state = stub.existSessionVariable(null, getSessionedString(varname));
     } catch (ServiceException e) {
       throw new RuntimeException(e);
     }
+
+    if (isThisError(state, NO_SUCH_SESSION_VARIABLE)) {
+      return false;
+    }
+
+    ensureOk(state);
+    return true;
   }
 
-  public Map<String, String> getAllSessionVariables() throws SQLException {
+  public Map<String, String> getAllSessionVariables() {
     NettyClientBase client = getTajoMasterConnection();
     checkSessionAndGet(client);
 
@@ -311,22 +315,29 @@ public class SessionConnection implements Closeable {
       throw new RuntimeException(e);
     }
 
-    throwIfError(response.getState());
+    ensureOk(response.getState());
     return ProtoUtil.convertToMap(response.getValue());
   }
 
-  public Boolean selectDatabase(final String databaseName) throws SQLException {
+  public Boolean selectDatabase(final String dbName) throws UndefinedDatabaseException {
 
     BlockingInterface stub = getTMStub();
     boolean selected;
     try {
-      selected = isSuccess(stub.selectDatabase(null, getSessionedString(databaseName)));
+      ReturnState state = stub.selectDatabase(null, getSessionedString(dbName));
+
+      if (isThisError(state, UNDEFINED_DATABASE)) {
+        throw new UndefinedDatabaseException(dbName);
+      }
+
+      selected = ensureOk(state);
+
     } catch (ServiceException e) {
       throw new RuntimeException(e);
     }
 
     if (selected) {
-      this.baseDatabase = databaseName;
+      this.baseDatabase = dbName;
     }
     return selected;
   }
@@ -362,7 +373,7 @@ public class SessionConnection implements Closeable {
     return serviceTracker.getClientServiceAddress();
   }
 
-  protected void checkSessionAndGet(NettyClientBase client) throws SQLException {
+  protected void checkSessionAndGet(NettyClientBase client) {
 
     if (sessionId == null) {
 
@@ -390,7 +401,7 @@ public class SessionConnection implements Closeable {
           LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
         }
       } else {
-        throw SQLExceptionUtil.toSQLException(response.getState());
+        throw new InvalidClientSessionException(sessionId.getId());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
index b66d451..8c167a4 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -26,8 +26,10 @@ import org.apache.tajo.annotation.ThreadSafe;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableDesc;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.exception.*;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
 import org.apache.tajo.ipc.ClientProtos.*;
 import org.apache.tajo.jdbc.TajoMemoryResultSet;
@@ -90,7 +92,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
   // QueryClient wrappers
   /*------------------------------------------------------------------------*/
 
-  public void closeQuery(final QueryId queryId) throws SQLException {
+  public void closeQuery(final QueryId queryId) {
     queryClient.closeQuery(queryId);
   }
 
@@ -98,23 +100,23 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
     queryClient.closeNonForwardQuery(queryId);
   }
 
-  public SubmitQueryResponse executeQuery(final String sql) throws SQLException {
+  public SubmitQueryResponse executeQuery(final String sql) {
     return queryClient.executeQuery(sql);
   }
 
-  public SubmitQueryResponse executeQueryWithJson(final String json) throws SQLException {
+  public SubmitQueryResponse executeQueryWithJson(final String json) {
     return queryClient.executeQueryWithJson(json);
   }
 
-  public ResultSet executeQueryAndGetResult(final String sql) throws SQLException {
+  public ResultSet executeQueryAndGetResult(final String sql) throws TajoException {
     return queryClient.executeQueryAndGetResult(sql);
   }
 
-  public ResultSet executeJsonQueryAndGetResult(final String json) throws SQLException {
+  public ResultSet executeJsonQueryAndGetResult(final String json) throws TajoException {
     return queryClient.executeJsonQueryAndGetResult(json);
   }
 
-  public QueryStatus getQueryStatus(QueryId queryId) throws SQLException {
+  public QueryStatus getQueryStatus(QueryId queryId) {
     return queryClient.getQueryStatus(queryId);
   }
 
@@ -134,15 +136,15 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
     return queryClient.fetchNextQueryResult(queryId, fetchRowNum);
   }
 
-  public boolean updateQuery(final String sql) throws SQLException {
+  public boolean updateQuery(final String sql) throws TajoException {
     return queryClient.updateQuery(sql);
   }
 
-  public boolean updateQueryWithJson(final String json) throws SQLException {
+  public boolean updateQueryWithJson(final String json) throws TajoException {
     return queryClient.updateQueryWithJson(json);
   }
 
-  public QueryStatus killQuery(final QueryId queryId) throws SQLException {
+  public QueryStatus killQuery(final QueryId queryId) {
     return queryClient.killQuery(queryId);
   }
 
@@ -150,7 +152,7 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
     return queryClient.getRunningQueryList();
   }
 
-  public List<BriefQueryInfo> getFinishedQueryList() throws SQLException {
+  public List<BriefQueryInfo> getFinishedQueryList() {
     return queryClient.getFinishedQueryList();
   }
 
@@ -178,54 +180,54 @@ public class TajoClientImpl extends SessionConnection implements TajoClient, Que
   // CatalogClient wrappers
   /*------------------------------------------------------------------------*/
 
-  public boolean createDatabase(final String databaseName) throws SQLException {
+  public boolean createDatabase(final String databaseName) throws DuplicateDatabaseException {
     return catalogClient.createDatabase(databaseName);
   }
 
-  public boolean existDatabase(final String databaseName) throws SQLException {
+  public boolean existDatabase(final String databaseName) {
     return catalogClient.existDatabase(databaseName);
   }
 
-  public boolean dropDatabase(final String databaseName) throws SQLException {
+  public boolean dropDatabase(final String databaseName) throws UndefinedDatabaseException {
     return catalogClient.dropDatabase(databaseName);
   }
 
-  public List<String> getAllDatabaseNames() throws SQLException {
+  public List<String> getAllDatabaseNames() {
     return catalogClient.getAllDatabaseNames();
   }
 
-  public boolean existTable(final String tableName) throws SQLException {
+  public boolean existTable(final String tableName) {
     return catalogClient.existTable(tableName);
   }
 
   public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path,
-                                       final TableMeta meta) throws SQLException {
+                                       final TableMeta meta) throws DuplicateTableException {
     return catalogClient.createExternalTable(tableName, schema, path, meta);
   }
 
   public TableDesc createExternalTable(final String tableName, final Schema schema, final URI path,
                                        final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
-      throws SQLException {
+      throws DuplicateTableException {
     return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc);
   }
 
-  public boolean dropTable(final String tableName) throws SQLException {
+  public boolean dropTable(final String tableName) throws UndefinedTableException {
     return dropTable(tableName, false);
   }
 
-  public boolean dropTable(final String tableName, final boolean purge) throws SQLException {
+  public boolean dropTable(final String tableName, final boolean purge) throws UndefinedTableException {
     return catalogClient.dropTable(tableName, purge);
   }
 
-  public List<String> getTableList(@Nullable final String databaseName) throws SQLException {
+  public List<String> getTableList(@Nullable final String databaseName) {
     return catalogClient.getTableList(databaseName);
   }
 
-  public TableDesc getTableDesc(final String tableName) throws SQLException {
+  public TableDesc getTableDesc(final String tableName) throws UndefinedTableException {
     return catalogClient.getTableDesc(tableName);
   }
 
-  public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws SQLException {
+  public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) {
     return catalogClient.getFunctions(functionName);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
index 358f1a0..c79b756 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -58,7 +58,7 @@ public class TajoClientUtil {
     return !isQueryWaitingForSchedule(state) && !isQueryRunning(state);
   }
 
-  public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) throws SQLException {
+  public static QueryStatus waitCompletion(QueryClient client, QueryId queryId) {
     QueryStatus status = client.getQueryStatus(queryId);
 
     while(!isQueryComplete(status.getState())) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java
new file mode 100644
index 0000000..8dce7c4
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegate.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.exception.TajoException;
+
+import java.io.Closeable;
+import java.sql.ResultSet;
+
+/**
+ * ClientDelegate is a delegate for various wired protocols like protocol buffer, rest API, and proxy.
+ */
+public interface ClientDelegate extends Closeable {
+
+  int executeUpdate(String sql) throws TajoException;
+
+  ResultSet executeSQL(String sql) throws TajoException;
+
+  QueryFuture executeSQLAsync(String sql) throws TajoException;
+
+  String currentDB();
+
+  void selectDB(String db) throws UndefinedDatabaseException;
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
new file mode 100644
index 0000000..44721b3
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientDelegateFactory.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.client.v2.exception.ClientUnableToConnectException;
+
+import java.util.Map;
+
+public class ClientDelegateFactory {
+
+  public static ClientDelegate newDefaultDelegate(String host,
+                                                  int port,
+                                                  @Nullable Map<String, String> props)
+      throws ClientUnableToConnectException {
+
+    return new LegacyClientDelegate(host, port, props);
+  }
+
+  public static ClientDelegate newDefaultDelegate(ServiceDiscovery discovery,
+                                                  @Nullable Map<String, String> props)
+      throws ClientUnableToConnectException {
+
+    return new LegacyClientDelegate(discovery, props);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java
new file mode 100644
index 0000000..b6a00e2
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ClientUtil.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+public class ClientUtil {
+
+  public static boolean isOk(QueryState state) {
+    return !(state == QueryState.ERROR || state == QueryState.FAILED);
+  }
+
+  public static boolean isFailed(QueryState state) {
+    return state == QueryState.ERROR || state == QueryState.FAILED;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java
new file mode 100644
index 0000000..ac6283e
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/FutureListener.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import java.util.EventListener;
+
+public interface FutureListener<V> extends EventListener {
+  void processingCompleted(V future);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
new file mode 100644
index 0000000..a17311b
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/LegacyClientDelegate.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.auth.UserRoleInfo;
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.client.*;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.TajoException;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.GetQueryStatusResponse;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerException;
+import org.apache.tajo.service.TajoMasterInfo;
+import org.apache.tajo.util.KeyValueSet;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.sql.ResultSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.tajo.exception.ReturnStateUtil.ensureOk;
+
+@ThreadSafe
+public class LegacyClientDelegate extends SessionConnection implements ClientDelegate {
+
+  private QueryClientImpl queryClient;
+  private final ExecutorService executor = Executors.newFixedThreadPool(8);
+
+  public LegacyClientDelegate(String host, int port, Map<String, String> props) {
+    super(new DummyServiceTracker(NetUtils.createSocketAddr(host, port)), null, new KeyValueSet(props));
+    queryClient = new QueryClientImpl(this);
+  }
+
+  public LegacyClientDelegate(ServiceDiscovery discovery, Map<String, String> props) {
+    super(new DelegateServiceTracker(discovery), null, new KeyValueSet(props));
+    queryClient = new QueryClientImpl(this);
+  }
+
+  @Override
+  public int executeUpdate(String sql) throws TajoException {
+    queryClient.updateQuery(sql);
+    return 0;
+  }
+
+  @Override
+  public ResultSet executeSQL(String sql) throws TajoException {
+    try {
+      return executeSQLAsync(sql).get();
+    } catch (InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public QueryFuture executeSQLAsync(String sql) throws TajoException {
+    ClientProtos.SubmitQueryResponse response = queryClient.executeQuery(sql);
+    ClientExceptionUtil.throwIfError(response.getState());
+
+    QueryId queryId = new QueryId(response.getQueryId());
+
+    switch (response.getResultType()) {
+    case ENCLOSED:
+      return new QueryFutureForEnclosed(queryId, TajoClientUtil.createResultSet(this.queryClient, response, 200));
+    case FETCH:
+      AsyncQueryFuture future = new AsyncQueryFuture(queryId);
+      executor.execute(future);
+      return future;
+    default:
+      return new QueryFutureForNoFetch(queryId);
+    }
+  }
+
+  @Override
+  public String currentDB() {
+    return getCurrentDatabase();
+  }
+
+  @Override
+  public void selectDB(String db) throws UndefinedDatabaseException {
+    selectDatabase(db);
+  }
+
+  private class QueryFutureForNoFetch implements QueryFuture {
+    protected final QueryId id;
+    private final long now = System.currentTimeMillis();
+
+    QueryFutureForNoFetch(QueryId id) {
+      this.id = id;
+    }
+
+    @Override
+    public String id() {
+      return id.toString();
+    }
+
+    @Override
+    public String queue() {
+      return "default";
+    }
+
+    @Override
+    public QueryState state() {
+      return QueryState.COMPLETED;
+    }
+
+    @Override
+    public float progress() {
+      return 1.0f;
+    }
+
+    @Override
+    public boolean isOk() {
+      return true;
+    }
+
+    @Override
+    public boolean isSuccessful() {
+      return true;
+    }
+
+    @Override
+    public boolean isFailed() {
+      return false;
+    }
+
+    @Override
+    public boolean isKilled() {
+      return false;
+    }
+
+    @Override
+    public UserRoleInfo user() {
+      return UserRoleInfo.getCurrentUser();
+    }
+
+    @Override
+    public void kill() {
+    }
+
+    @Override
+    public long submitTime() {
+      return 0;
+    }
+
+    @Override
+    public long startTime() {
+      return now;
+    }
+
+    @Override
+    public long finishTime() {
+      return now;
+    }
+
+    @Override
+    public void release() {
+      queryClient.closeQuery(id);
+    }
+
+    @Override
+    public void addListener(FutureListener<QueryFuture> future) {
+      future.processingCompleted(this);
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+      return false;
+    }
+
+    @Override
+    public boolean isCancelled() {
+      return false;
+    }
+
+    @Override
+    public boolean isDone() {
+      return true;
+    }
+
+    @Override
+    public ResultSet get() {
+      return TajoClientUtil.NULL_RESULT_SET;
+    }
+
+    @Override
+    public ResultSet get(long timeout, TimeUnit unit) {
+      return TajoClientUtil.NULL_RESULT_SET;
+    }
+  }
+
+  private class QueryFutureForEnclosed extends QueryFutureForNoFetch {
+    private final ResultSet resultSet;
+    QueryFutureForEnclosed(QueryId id, ResultSet resultSet) {
+      super(id);
+      this.resultSet = resultSet;
+    }
+
+    @Override
+    public ResultSet get() {
+      return resultSet;
+    }
+
+    @Override
+    public ResultSet get(long timeout, TimeUnit unit) {
+      return resultSet;
+    }
+  }
+
+  private class AsyncQueryFuture extends AbstractFuture<ResultSet> implements QueryFuture, Runnable {
+    private final QueryId queryId;
+    private volatile QueryState lastState;
+    private volatile float progress;
+    private final long submitTime = System.currentTimeMillis();
+    private volatile long startTime = 0;
+    private volatile long finishTime = 0;
+
+    public AsyncQueryFuture(QueryId queryId) {
+      this.queryId = queryId;
+      this.lastState = QueryState.SCHEDULED;
+    }
+
+    @Override
+    public String id() {
+      return queryId.toString();
+    }
+
+    @Override
+    public boolean isOk() {
+      return ClientUtil.isOk(lastState);
+    }
+
+    @Override
+    public boolean isSuccessful() {
+      return lastState == QueryState.COMPLETED;
+    }
+
+    @Override
+    public boolean isFailed() {
+      return ClientUtil.isFailed(lastState);
+    }
+
+    @Override
+    public boolean isKilled() {
+      return queryClient.getQueryStatus(queryId).getState() == TajoProtos.QueryState.QUERY_KILLED;
+    }
+
+    @Override
+    public QueryState state() {
+      return lastState;
+    }
+
+    @Override
+    public String queue() {
+      return "default";
+    }
+
+    @Override
+    public UserRoleInfo user() {
+      return UserRoleInfo.getCurrentUser();
+    }
+
+    @Override
+    public float progress() {
+      return progress;
+    }
+
+    @Override
+    public void kill() {
+      queryClient.killQuery(queryId).getState();
+    }
+
+    @Override
+    public long submitTime() {
+      return submitTime;
+    }
+
+    @Override
+    public long startTime() {
+      return startTime;
+    }
+
+    @Override
+    public long finishTime() {
+      return finishTime;
+    }
+
+    @Override
+    public void release() {
+      queryClient.closeQuery(queryId);
+    }
+
+    @Override
+    public void addListener(final FutureListener<QueryFuture> listener) {
+      final QueryFuture f = this;
+      addListener(new Runnable() {
+        @Override
+        public void run() {
+          listener.processingCompleted(f);
+        }},
+        executor);
+    }
+
+    private void updateState(GetQueryStatusResponse lastState) {
+      this.startTime = lastState.getSubmitTime();
+      this.finishTime = lastState.getFinishTime();
+      this.progress = lastState.getProgress();
+      this.lastState = convert(lastState.getQueryState());
+    }
+
+    GetQueryStatusResponse waitCompletion() {
+      GetQueryStatusResponse response = queryClient.getRawQueryStatus(queryId);
+      ensureOk(response.getState());
+      updateState(response);
+
+      while(!TajoClientUtil.isQueryComplete(response.getQueryState())) {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+
+        response = queryClient.getRawQueryStatus(queryId);
+        updateState(response);
+        ensureOk(response.getState());
+      }
+      return response;
+    }
+
+    @Override
+    public void run() {
+      GetQueryStatusResponse finalResponse;
+      try {
+        finalResponse = waitCompletion();
+      } catch (Throwable t) {
+        setException(t);
+        return;
+      }
+
+      if (finalResponse.getQueryState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+        if (finalResponse.hasHasResult()) {
+          set(queryClient.getQueryResult(queryId));
+        } else { // when update
+          set(TajoClientUtil.NULL_RESULT_SET);
+        }
+      } else {
+        cancel(false); // failed
+        set(TajoClientUtil.NULL_RESULT_SET);
+      }
+    }
+  }
+
+  public static class DelegateServiceTracker implements ServiceTracker {
+
+    private final ServiceDiscovery discovery;
+    DelegateServiceTracker(ServiceDiscovery discovery) {
+      this.discovery = discovery;
+    }
+
+    @Override
+    public boolean isHighAvailable() {
+      return false;
+    }
+
+    @Override
+    public InetSocketAddress getUmbilicalAddress() throws ServiceTrackerException {
+      return null;
+    }
+
+    @Override
+    public InetSocketAddress getClientServiceAddress() throws ServiceTrackerException {
+      return discovery.clientAddress();
+    }
+
+    @Override
+    public InetSocketAddress getResourceTrackerAddress() throws ServiceTrackerException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public InetSocketAddress getCatalogAddress() throws ServiceTrackerException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public InetSocketAddress getMasterHttpInfo() throws ServiceTrackerException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public int getState(String masterName, TajoConf conf) throws ServiceTrackerException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public int formatHA(TajoConf conf) throws ServiceTrackerException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public List<String> getMasters(TajoConf conf) throws ServiceTrackerException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public void register() throws IOException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public void delete() throws IOException {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public boolean isActiveMaster() {
+      throw new UnimplementedException();
+    }
+
+    @Override
+    public List<TajoMasterInfo> getMasters() throws IOException {
+      throw new UnimplementedException();
+    }
+  }
+
+  public static QueryState convert(TajoProtos.QueryState state) {
+    switch (state) {
+    case QUERY_NEW:
+    case QUERY_INIT:
+    case QUERY_NOT_ASSIGNED:
+      return QueryState.SCHEDULED;
+
+    case QUERY_MASTER_INIT:
+    case QUERY_MASTER_LAUNCHED:
+    case QUERY_RUNNING:
+      return QueryState.RUNNING;
+
+    case QUERY_KILL_WAIT:
+      return QueryState.KILLING;
+
+    case QUERY_KILLED:
+      return QueryState.KILLED;
+
+    case QUERY_FAILED:
+      return QueryState.FAILED;
+
+    case QUERY_ERROR:
+      return QueryState.ERROR;
+
+    case QUERY_SUCCEEDED:
+      return QueryState.COMPLETED;
+
+    default:
+      throw new RuntimeException("Unknown state:" + state.name());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java
new file mode 100644
index 0000000..f1604cd
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryFuture.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.auth.UserRoleInfo;
+
+import java.sql.ResultSet;
+import java.util.concurrent.Future;
+
+/**
+ *
+ */
+public interface QueryFuture extends Future<ResultSet> {
+  /**
+   * Get a query id
+   *
+   * @return query id
+   */
+  String id();
+
+  /**
+   * Get the queue name that the query is running
+   *
+   * @return queue name
+   */
+  String queue();
+
+  /**
+   * Get a query state
+   *
+   * @return query state
+   */
+  QueryState state();
+
+  /**
+   * Get a normalized progress (0 ~ 1.0f) of a query running
+   *
+   * @return progress
+   */
+  float progress();
+
+  /**
+   * A submitted or running query state is normal
+   *
+   * @return True if a query state is normal
+   */
+  boolean isOk();
+
+  /**
+   * Get whether the query is successfully completed or not.
+   *
+   * @return True if the query is successfully completed.
+   */
+  boolean isSuccessful();
+
+  /**
+   * Get whether the query is abort due to error.
+   *
+   * @return True if the query is abort due to error.
+   */
+  boolean isFailed();
+
+  /**
+   * Get whether the query is killed. This is equivalent to
+   * @{link Future#cancel}.
+   *
+   * @return True if the query is already killed.
+   */
+  boolean isKilled();
+
+  /**
+   * Get an user information
+   *
+   * @return UserRoleInfo
+   */
+  UserRoleInfo user();
+
+  /**
+   * Kill this query
+   */
+  void kill();
+
+  /**
+   * Get the time when a query is submitted.
+   * This time can be different from @{link QueryFuture#startTime}
+   * due to scheduling delay.
+   *
+   * @return Millisecond since epoch
+   */
+  long submitTime();
+
+  /**
+   * Get the time when a query is actually launched.
+   *
+   * @return Millisecond since epoch
+   */
+  long startTime();
+
+  /**
+   * Get the time when a query is finished.
+   *
+   * @return Millisecond since epoch
+   */
+  long finishTime();
+
+  /**
+   * Release a query future. It will be automatically released after the session invalidation.
+   */
+  void release();
+
+  /**
+   * Add a listener which will be executed after this query is completed, error, failed or killed.
+   *
+   * @param future
+   */
+  void addListener(FutureListener<QueryFuture> future);
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java
new file mode 100644
index 0000000..24ea386
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/QueryState.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+public enum QueryState {
+  /** successfully submitted */
+  SCHEDULED,
+  /** Running */
+  RUNNING,
+  /** Error before a query execution */
+  ERROR,
+  /** Failure after a query launches */
+  FAILED,
+  /** Killed */
+  KILLED,
+  /** Wait for completely kill */
+  KILLING,
+  /** Successfully completed */
+  COMPLETED
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java
new file mode 100644
index 0000000..e69ca8a
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/ServiceDiscovery.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Client service discovery interface
+ */
+public interface ServiceDiscovery {
+  InetSocketAddress clientAddress();
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
new file mode 100644
index 0000000..08a921d
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/TajoClient.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2;
+
+import org.apache.tajo.catalog.exception.UndefinedDatabaseException;
+import org.apache.tajo.client.v2.exception.ClientUnableToConnectException;
+import org.apache.tajo.exception.TajoException;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.sql.ResultSet;
+import java.util.Map;
+
+public class TajoClient implements Closeable {
+  /**
+   * default client port number
+   */
+  public static final int DEFAULT_PORT = 26002;
+
+  private final ClientDelegate delegate;
+
+  /**
+   * Initialize TajoClient with a hostname and default port 26002.
+   *
+   * @param host hostname to connect
+   */
+  public TajoClient(String host) throws ClientUnableToConnectException {
+    delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, null);
+  }
+
+  /**
+   * Initialize TajoClient with a hostname and default port 26002.
+   *
+   * @param host       Hostname to connect
+   * @param properties Connection properties
+   */
+  public TajoClient(String host, Map<String, String> properties) throws ClientUnableToConnectException {
+    delegate = ClientDelegateFactory.newDefaultDelegate(host, DEFAULT_PORT, properties);
+  }
+
+  /**
+   * Initialize TajoClient with a hostname and port
+   *
+   * @param host Hostname to connect
+   * @param port Port number to connect
+   */
+  public TajoClient(String host, int port) throws ClientUnableToConnectException {
+    delegate = ClientDelegateFactory.newDefaultDelegate(host, port, null);
+  }
+
+  /**
+   * Initialize TajoClient with a hostname and port
+   *
+   * @param host       Hostname to connect
+   * @param port       Port number to connect
+   * @param properties Connection properties
+   */
+  public TajoClient(String host, int port, Map<String, String> properties) throws ClientUnableToConnectException {
+    delegate = ClientDelegateFactory.newDefaultDelegate(host, port, properties);
+  }
+
+  /**
+   * Initialize TajoClient via service discovery protocol
+   *
+   * @param discovery Service discovery
+   */
+  public TajoClient(ServiceDiscovery discovery) throws ClientUnableToConnectException {
+    delegate = ClientDelegateFactory.newDefaultDelegate(discovery, null);
+  }
+
+  /**
+   * Initialize TajoClient via service discovery protocol
+   *
+   * @param discovery Service discovery
+   * @param properties Connection properties
+   */
+  public TajoClient(ServiceDiscovery discovery, Map<String, String> properties) throws ClientUnableToConnectException {
+    delegate = ClientDelegateFactory.newDefaultDelegate(discovery, properties);
+  }
+
+  /**
+   * Submit and executes the given SQL statement, which may be an <code>INSERT (INTO)</code>,
+   * or <code>CREATE TABLE AS SELECT</code> statement or anSQL statement that returns nothing,
+   * such as an SQL DDL statement.
+   *
+   * @param sql a SQL statement
+   * @return inserted row number
+   * @throws TajoException
+   */
+  public int executeUpdate(String sql) throws TajoException {
+    return delegate.executeUpdate(sql);
+  }
+
+  /**
+   * Submit a SQL query statement
+   *
+   * @param sql a SQL statement
+   * @return QueryHandler
+   * @throws TajoException
+   */
+  public ResultSet executeQuery(String sql) throws TajoException {
+    return delegate.executeSQL(sql);
+  }
+
+  /**
+   * Execute a SQL statement through asynchronous API
+   *
+   * @param sql
+   * @return
+   * @throws TajoException
+   */
+  public QueryFuture executeQueryAsync(String sql) throws TajoException {
+    return delegate.executeSQLAsync(sql);
+  }
+
+  public void close() throws IOException {
+    delegate.close();
+  }
+
+  /**
+   * Select working database
+   *
+   * @param database Database name
+   * @throws UndefinedDatabaseException
+   */
+  public void selectDB(String database) throws UndefinedDatabaseException {
+    delegate.selectDB(database);
+  }
+
+  /**
+   * Get the current working database
+   *
+   * @return Current working database
+   */
+  public String currentDB() {
+    return delegate.currentDB();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java
new file mode 100644
index 0000000..a7fb08a
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientConnectionException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2.exception;
+
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.TajoRuntimeException;
+
+public class ClientConnectionException extends TajoRuntimeException {
+  public ClientConnectionException(Throwable t) {
+    super(Errors.ResultCode.CLIENT_CONNECTION_EXCEPTION, t.getMessage());
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java
new file mode 100644
index 0000000..e567d7d
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/v2/exception/ClientUnableToConnectException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client.v2.exception;
+
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.exception.TajoException;
+
+public class ClientUnableToConnectException extends TajoException {
+  public ClientUnableToConnectException() {
+    super(Errors.ResultCode.CLIENT_UNABLE_TO_ESTABLISH_CONNECTION);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index 869d7c4..8f15710 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -87,6 +87,6 @@ public class FetchResultSet extends TajoResultSetBase {
       currentResultSet.close();
       currentResultSet = null;
     }
-    tajoClient.closeNonForwardQuery(queryId);
+    tajoClient.closeQuery(queryId);
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java b/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
index 7cf6e1e..574dc3b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/AmbiguousColumnException.java
@@ -18,14 +18,17 @@
 
 package org.apache.tajo.exception;
 
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
+
 import static org.apache.tajo.error.Errors.ResultCode.AMBIGUOUS_COLUMN;
 
 public class AmbiguousColumnException extends TajoException {
   private static final long serialVersionUID = 3102675985226352347L;
 
-  /**
-   * @param fieldName
-   */
+  public AmbiguousColumnException(ReturnState state) {
+    super(state);
+  }
+
   public AmbiguousColumnException(String fieldName) {
     super(AMBIGUOUS_COLUMN, fieldName);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
index d50164d..3b646e1 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorMessages.java
@@ -21,6 +21,7 @@ package org.apache.tajo.exception;
 import com.google.common.collect.Maps;
 import org.apache.tajo.error.Errors.ResultCode;
 import org.apache.tajo.util.Pair;
+import org.apache.tajo.util.StringUtils;
 
 import java.util.Map;
 
@@ -99,6 +100,9 @@ public class ErrorMessages {
     ADD_MESSAGE(AMBIGUOUS_PARTITION_DIRECTORY, "There is a directory which is assumed to be a partitioned directory" +
       " : '%s'", 1);
 
+
+    ADD_MESSAGE(CLIENT_CONNECTION_EXCEPTION, "Client connection to '%s' has error: %s", 2);
+    ADD_MESSAGE(CLIENT_UNABLE_TO_ESTABLISH_CONNECTION, "Client is unable to establish connection to '%s'", 1);
   }
 
   private static void ADD_MESSAGE(ResultCode code, String msgFormat) {
@@ -138,7 +142,7 @@ public class ErrorMessages {
         }
 
       } else {
-        throw new TajoRuntimeException(code, args);
+        throw new TajoInternalError("Argument mismatch: code=" + code.name() + ", args=" + StringUtils.join(args));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java b/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java
new file mode 100644
index 0000000..c9ed78d
--- /dev/null
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/NoSuchSessionVariableException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.exception;
+
+import org.apache.tajo.error.Errors;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+public class NoSuchSessionVariableException extends TajoException {
+
+  public NoSuchSessionVariableException(PrimitiveProtos.ReturnState state) {
+    super(state);
+  }
+
+  public NoSuchSessionVariableException(String variableName) {
+    super(Errors.ResultCode.NO_SUCH_SESSION_VARIABLE, variableName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
index fb6b9a5..81554c4 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java
@@ -45,10 +45,11 @@ public class ReturnStateUtil {
     OK = builder.build();
   }
 
-  public static void ensureOk(ReturnState state) {
+  public static boolean ensureOk(ReturnState state) {
     if (isError(state)) {
       throw new TajoRuntimeException(state);
     }
+    return true;
   }
 
   public static StringListResponse returnStringList(Collection<String> values) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
index 10b5aff..deeb7f9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/SQLExceptionUtil.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.exception;
 
 import com.google.common.collect.Maps;
+import org.apache.log4j.spi.ErrorCode;
 import org.apache.tajo.error.Errors.ResultCode;
 import org.apache.tajo.exception.ErrorMessages;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
@@ -54,27 +55,34 @@ public class SQLExceptionUtil {
     }
   }
 
-  public static SQLException toSQLException(ReturnState state) throws SQLException {
-
-    if (SQLSTATES.containsKey(state.getReturnCode())) {
+  private static SQLException toSQLException(ResultCode code, String message) throws SQLException {
+    if (SQLSTATES.containsKey(code)) {
 
       return new SQLException(
-          state.getMessage(),
-          SQLSTATES.get(state.getReturnCode()),
-          state.getReturnCode().getNumber()
+          message,
+          SQLSTATES.get(code),
+          code.getNumber()
       );
 
     } else {
       // If there is no SQLState corresponding to error code,
       // It will make SQLState '42000' (Syntax Error Or Access Rule Violation).
       return new SQLException(
-          state.getMessage(),
+          message,
           "42000",
           ResultCode.SYNTAX_ERROR_OR_ACCESS_RULE_VIOLATION_VALUE
       );
     }
   }
 
+  public static SQLException toSQLException(TajoException e) throws SQLException {
+    return toSQLException(e.getErrorCode(), e.getMessage());
+  }
+
+  public static SQLException toSQLException(ReturnState state) throws SQLException {
+    return toSQLException(state.getReturnCode(), state.getMessage());
+  }
+
   public static SQLException makeSQLException(ResultCode code, String ...args) {
     if (SQLSTATES.containsKey(code)) {
       return new SQLException(

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
index dbb2748..765ead3 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoError.java
@@ -19,6 +19,8 @@
 package org.apache.tajo.exception;
 
 import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 
 /**
  * Unrecoverable errors
@@ -26,6 +28,11 @@ import org.apache.tajo.error.Errors.ResultCode;
 public class TajoError extends Error implements TajoExceptionInterface {
   private ResultCode code;
 
+  public TajoError(ReturnState state) {
+    super(state.getMessage());
+    code = state.getReturnCode();
+  }
+
   public TajoError(ResultCode code) {
     super(ErrorMessages.getMessage(code));
     this.code = code;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
index 781d1a0..e0e2ccb 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoException.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.exception;
 
 import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 
 /**
  * TajoException contains all exceptions with any exact reason.
@@ -27,6 +28,16 @@ import org.apache.tajo.error.Errors.ResultCode;
 public class TajoException extends Exception implements TajoExceptionInterface {
   private ResultCode code;
 
+  public TajoException(ReturnState e) {
+    super(e.getMessage());
+    this.code = e.getReturnCode();
+  }
+
+  public TajoException(TajoRuntimeException e) {
+    super(e.getMessage());
+    this.code = e.getErrorCode();
+  }
+
   public TajoException(ResultCode code) {
     super(ErrorMessages.getMessage(code));
     this.code = code;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java b/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
index 767c13c..072636b 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/TajoInternalError.java
@@ -19,12 +19,18 @@
 package org.apache.tajo.exception;
 
 import org.apache.tajo.error.Errors.ResultCode;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 
 /**
  * Exception for Internal Bugs and Unexpected exception
  */
 public class TajoInternalError extends TajoError {
 
+  public TajoInternalError(ReturnState state) {
+    super(state);
+  }
+
   public TajoInternalError(String message) {
     super(ResultCode.INTERNAL_ERROR, message);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
index 9d91946..70586c9 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/UndefinedOperatorException.java
@@ -19,9 +19,14 @@
 package org.apache.tajo.exception;
 
 import org.apache.tajo.error.Errors;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 
 public class UndefinedOperatorException extends TajoException {
 
+  public UndefinedOperatorException(ReturnState state) {
+    super(state);
+  }
+
   public UndefinedOperatorException(String operation) {
     super(Errors.ResultCode.UNDEFINED_OPERATOR, operation);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java b/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
index 9ca5539..a7a3915 100644
--- a/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
+++ b/tajo-common/src/main/java/org/apache/tajo/exception/UnsupportedException.java
@@ -19,10 +19,15 @@
 package org.apache.tajo.exception;
 
 import org.apache.tajo.error.Errors;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState;
 
 public class UnsupportedException extends TajoRuntimeException {
   private static final long serialVersionUID = 6702291354858193578L;
 
+  public UnsupportedException(ReturnState state) {
+    super(state);
+  }
+
   public UnsupportedException(String featureName) {
     super(Errors.ResultCode.FEATURE_NOT_SUPPORTED, featureName);
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml
index 7489503..60e02a6 100644
--- a/tajo-core/pom.xml
+++ b/tajo-core/pom.xml
@@ -54,8 +54,8 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-compiler-plugin</artifactId>
         <configuration>
-          <source>1.6</source>
-          <target>1.6</target>
+          <source>1.7</source>
+          <target>1.7</target>
           <encoding>${project.build.sourceEncoding}</encoding>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index b4a28db..90c95a1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -28,6 +28,7 @@ import org.apache.tajo.client.TajoClient;
 import org.apache.tajo.client.TajoClientImpl;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.service.ServiceTracker;
 import org.apache.tajo.service.ServiceTrackerFactory;
 import org.apache.tajo.util.FileUtil;
@@ -91,7 +92,7 @@ public abstract class BenchmarkSet {
 
   public abstract void loadQueries() throws IOException;
 
-  public abstract void loadTables() throws SQLException;
+  public abstract void loadTables() throws TajoException;
 
   public String [] getTableNames() {
     return schemas.keySet().toArray(new String[schemas.size()]);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
index 91a3b66..9739767 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.storage.StorageConstants;
 
 import java.io.File;
@@ -192,7 +193,7 @@ public class TPCH extends BenchmarkSet {
     loadQueries(BENCHMARK_DIR);
   }
 
-  public void loadTables() throws SQLException {
+  public void loadTables() throws TajoException {
     loadTable(LINEITEM);
     loadTable(CUSTOMER);
     loadTable(CUSTOMER_PARTS);
@@ -206,7 +207,7 @@ public class TPCH extends BenchmarkSet {
 
   }
 
-  public void loadTable(String tableName) throws SQLException {
+  public void loadTable(String tableName) throws TajoException {
     TableMeta meta = CatalogUtil.newTableMeta("TEXT");
     meta.putOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
index 67f782a..07445a4 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java
@@ -29,6 +29,9 @@ import org.apache.tajo.ExecutionBlockId;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.algebra.JoinType;
 import org.apache.tajo.catalog.*;
+import org.apache.tajo.catalog.exception.AmbiguousFunctionException;
+import org.apache.tajo.catalog.exception.CatalogException;
+import org.apache.tajo.catalog.exception.UndefinedFunctionException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
@@ -37,10 +40,7 @@ import org.apache.tajo.engine.planner.global.builder.DistinctGroupbyBuilder;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteEngine;
 import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanRewriteRuleProvider;
 import org.apache.tajo.engine.query.QueryContext;
-import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.exception.TajoException;
-import org.apache.tajo.exception.TajoInternalError;
-import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.exception.*;
 import org.apache.tajo.plan.LogicalPlan;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.Target;
@@ -346,31 +346,32 @@ public class GlobalPlanner {
     }
   }
 
-  private AggregationFunctionCallEval createSumFunction(EvalNode[] args) throws InternalException {
-    FunctionDesc functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION,
+  private AggregationFunctionCallEval createSumFunction(EvalNode[] args) throws CatalogException {
+    FunctionDesc functionDesc = null;
+    functionDesc = getCatalog().getFunction("sum", CatalogProtos.FunctionType.AGGREGATION,
         args[0].getValueType());
     return new AggregationFunctionCallEval(functionDesc, args);
   }
 
-  private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws InternalException {
+  private AggregationFunctionCallEval createCountFunction(EvalNode [] args) throws CatalogException {
     FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
         args[0].getValueType());
     return new AggregationFunctionCallEval(functionDesc, args);
   }
 
-  private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws InternalException {
+  private AggregationFunctionCallEval createCountRowFunction(EvalNode[] args) throws CatalogException {
     FunctionDesc functionDesc = getCatalog().getFunction("count", CatalogProtos.FunctionType.AGGREGATION,
         new TajoDataTypes.DataType[]{});
     return new AggregationFunctionCallEval(functionDesc, args);
   }
 
-  private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws InternalException {
+  private AggregationFunctionCallEval createMaxFunction(EvalNode [] args) throws CatalogException {
     FunctionDesc functionDesc = getCatalog().getFunction("max", CatalogProtos.FunctionType.AGGREGATION,
         args[0].getValueType());
     return new AggregationFunctionCallEval(functionDesc, args);
   }
 
-  private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws InternalException {
+  private AggregationFunctionCallEval createMinFunction(EvalNode [] args) throws CatalogException {
     FunctionDesc functionDesc = getCatalog().getFunction("min", CatalogProtos.FunctionType.AGGREGATION,
         args[0].getValueType());
     return new AggregationFunctionCallEval(functionDesc, args);
@@ -428,57 +429,53 @@ public class GlobalPlanner {
    */
   private RewrittenFunctions rewriteAggFunctionsForDistinctAggregation(GlobalPlanContext context,
                                                                        AggregationFunctionCallEval function)
-      throws PlanningException {
+      throws TajoException {
 
     LogicalPlan plan = context.plan.getLogicalPlan();
     RewrittenFunctions rewritten = null;
 
-    try {
-      if (function.getName().equalsIgnoreCase("count")) {
-        rewritten = new RewrittenFunctions(1);
-
-        if (function.getArgs().length == 0) {
-          rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs());
-        } else {
-          rewritten.firstStageEvals[0] = createCountFunction(function.getArgs());
-        }
-        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
-        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
-        rewritten.firstStageTargets[0] = new Target(fieldEval);
-        rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval});
-      } else if (function.getName().equalsIgnoreCase("sum")) {
-        rewritten = new RewrittenFunctions(1);
-
-        rewritten.firstStageEvals[0] = createSumFunction(function.getArgs());
-        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
-        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
-        rewritten.firstStageTargets[0] = new Target(fieldEval);
-        rewritten.secondStageEvals = createSumFunction(new EvalNode[] {fieldEval});
-
-      } else if (function.getName().equals("max")) {
-        rewritten = new RewrittenFunctions(1);
-
-        rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs());
-        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
-        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
-        rewritten.firstStageTargets[0] = new Target(fieldEval);
-        rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval});
-
-      } else if (function.getName().equals("min")) {
-
-        rewritten = new RewrittenFunctions(1);
-
-        rewritten.firstStageEvals[0] = createMinFunction(function.getArgs());
-        String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
-        FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
-        rewritten.firstStageTargets[0] = new Target(fieldEval);
-        rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval});
+    if (function.getName().equalsIgnoreCase("count")) {
+      rewritten = new RewrittenFunctions(1);
 
+      if (function.getArgs().length == 0) {
+        rewritten.firstStageEvals[0] = createCountRowFunction(function.getArgs());
       } else {
-        throw new PlanningException("Cannot support a mix of other functions");
+        rewritten.firstStageEvals[0] = createCountFunction(function.getArgs());
       }
-    } catch (InternalException e) {
-      LOG.error(e, e);
+      String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+      FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+      rewritten.firstStageTargets[0] = new Target(fieldEval);
+      rewritten.secondStageEvals = createSumFunction(new EvalNode[]{fieldEval});
+    } else if (function.getName().equalsIgnoreCase("sum")) {
+      rewritten = new RewrittenFunctions(1);
+
+      rewritten.firstStageEvals[0] = createSumFunction(function.getArgs());
+      String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+      FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+      rewritten.firstStageTargets[0] = new Target(fieldEval);
+      rewritten.secondStageEvals = createSumFunction(new EvalNode[]{fieldEval});
+
+    } else if (function.getName().equals("max")) {
+      rewritten = new RewrittenFunctions(1);
+
+      rewritten.firstStageEvals[0] = createMaxFunction(function.getArgs());
+      String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+      FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+      rewritten.firstStageTargets[0] = new Target(fieldEval);
+      rewritten.secondStageEvals = createMaxFunction(new EvalNode[]{fieldEval});
+
+    } else if (function.getName().equals("min")) {
+
+      rewritten = new RewrittenFunctions(1);
+
+      rewritten.firstStageEvals[0] = createMinFunction(function.getArgs());
+      String referenceName = plan.generateUniqueColumnName(rewritten.firstStageEvals[0]);
+      FieldEval fieldEval = new FieldEval(referenceName, rewritten.firstStageEvals[0].getValueType());
+      rewritten.firstStageTargets[0] = new Target(fieldEval);
+      rewritten.secondStageEvals = createMinFunction(new EvalNode[]{fieldEval});
+
+    } else {
+      throw new UnsupportedException("Cannot support a mix of other functions");
     }
 
     return rewritten;
@@ -523,7 +520,7 @@ public class GlobalPlanner {
    */
   private ExecutionBlock buildGroupByIncludingDistinctFunctionsMultiStage(GlobalPlanContext context,
                                                                 ExecutionBlock latestExecBlock,
-                                                                GroupbyNode groupbyNode) throws PlanningException {
+                                                                GroupbyNode groupbyNode) throws TajoException {
 
     Column [] originalGroupingColumns = groupbyNode.getGroupingColumns();
     LinkedHashSet<Column> firstStageGroupingColumns =

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index b70a79f..d16f7d1 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -38,6 +38,7 @@ import org.apache.tajo.catalog.CatalogServer;
 import org.apache.tajo.catalog.CatalogService;
 import org.apache.tajo.catalog.FunctionDesc;
 import org.apache.tajo.catalog.LocalCatalogWrapper;
+import org.apache.tajo.catalog.exception.DuplicateDatabaseException;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.function.FunctionLoader;
@@ -369,7 +370,7 @@ public class TajoMaster extends CompositeService {
     }
   }
 
-  private void checkBaseTBSpaceAndDatabase() throws IOException {
+  private void checkBaseTBSpaceAndDatabase() throws IOException, DuplicateDatabaseException {
     if (!catalog.existTablespace(DEFAULT_TABLESPACE_NAME)) {
       catalog.createTablespace(DEFAULT_TABLESPACE_NAME, context.getConf().getVar(ConfVars.WAREHOUSE_DIR));
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/4253f1b6/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
index 40ebf4e..a43b95e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/CreateTableExecutor.java
@@ -28,6 +28,7 @@ import org.apache.tajo.catalog.exception.DuplicateTableException;
 import org.apache.tajo.catalog.exception.UndefinedTablespaceException;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
 import org.apache.tajo.engine.query.QueryContext;
+import org.apache.tajo.exception.TajoException;
 import org.apache.tajo.exception.TajoInternalError;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.plan.logical.CreateTableNode;
@@ -52,7 +53,7 @@ public class CreateTableExecutor {
   }
 
   public TableDesc create(QueryContext queryContext, CreateTableNode createTable, boolean ifNotExists)
-      throws IOException {
+      throws IOException, TajoException {
 
     TableMeta meta;
     if (createTable.hasOptions()) {
@@ -85,7 +86,7 @@ public class CreateTableExecutor {
                           @Nullable URI uri,
                           boolean isExternal,
                           @Nullable PartitionMethodDesc partitionDesc,
-                          boolean ifNotExists) throws IOException {
+                          boolean ifNotExists) throws IOException, TajoException {
 
     Pair<String, String> separatedNames = getQualifiedName(queryContext.getCurrentDatabase(), tableName);
     String databaseName = separatedNames.getFirst();
@@ -119,7 +120,7 @@ public class CreateTableExecutor {
     }
   }
 
-  private TableDesc handlExistence(boolean ifNotExists, String qualifiedName) {
+  private TableDesc handlExistence(boolean ifNotExists, String qualifiedName) throws DuplicateTableException {
     if (ifNotExists) {
       LOG.info("relation \"" + qualifiedName + "\" is already exists.");
       return catalog.getTableDesc(qualifiedName);
@@ -131,13 +132,15 @@ public class CreateTableExecutor {
   private Pair<String, String> getQualifiedName(String currentDatabase, String tableName) {
     if (CatalogUtil.isFQTableName(tableName)) {
       String [] splitted = CatalogUtil.splitFQTableName(tableName);
-      return new Pair<String, String>(splitted[0], splitted[1]);
+      return new Pair<>(splitted[0], splitted[1]);
     } else {
-      return new Pair<String, String>(currentDatabase, tableName);
+      return new Pair<>(currentDatabase, tableName);
     }
   }
 
-  private Tablespace getTablespaceHandler(@Nullable String tableSpaceName, @Nullable URI tableUri) {
+  private Tablespace getTablespaceHandler(@Nullable String tableSpaceName, @Nullable URI tableUri)
+      throws UndefinedTablespaceException {
+
     if (tableSpaceName != null) {
       Optional<Tablespace> ts = (Optional<Tablespace>) TablespaceManager.getByName(tableSpaceName);
       if (ts.isPresent()) {