You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/10/29 06:50:08 UTC
[2/6] TAJO-1140: Separate TajoClient into fine grained parts.
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
index 82bd855..376f63f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClient.java
@@ -18,1031 +18,10 @@
package org.apache.tajo.client;
-import com.google.protobuf.ServiceException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.QueryIdFactory;
-import org.apache.tajo.SessionVars;
-import org.apache.tajo.TajoIdProtos;
-import org.apache.tajo.TajoProtos.QueryState;
-import org.apache.tajo.annotation.Nullable;
import org.apache.tajo.annotation.ThreadSafe;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.partition.PartitionDesc;
-import org.apache.tajo.catalog.partition.PartitionMethodDesc;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.proto.CatalogProtos.PartitionMethodProto;
-import org.apache.tajo.cli.InvalidClientSessionException;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.ClientProtos.*;
-import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService;
-import org.apache.tajo.jdbc.FetchResultSet;
-import org.apache.tajo.jdbc.SQLStates;
-import org.apache.tajo.jdbc.TajoMemoryResultSet;
-import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.rpc.ServerCallable;
-import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto;
-import org.apache.tajo.util.HAServiceUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.util.NetUtils;
import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.apache.tajo.ipc.ClientProtos.SerializedResultSet;
@ThreadSafe
-public class TajoClient implements Closeable {
- public static final int UNKNOWN_ROW_NUMBER = -1;
-
- private final Log LOG = LogFactory.getLog(TajoClient.class);
-
- private final TajoConf conf;
-
- private final Map<QueryId, InetSocketAddress> queryMasterMap = new ConcurrentHashMap<QueryId, InetSocketAddress>();
-
- private final InetSocketAddress tajoMasterAddr;
-
- private final RpcConnectionPool connPool;
-
- private final String baseDatabase;
-
- private final UserGroupInformation userInfo;
-
- private volatile TajoIdProtos.SessionIdProto sessionId;
-
- private AtomicBoolean closed = new AtomicBoolean(false);
-
- public TajoClient(TajoConf conf) throws IOException {
- this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
- }
-
- public TajoClient(TajoConf conf, @Nullable String baseDatabase) throws IOException {
- this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
- }
-
- /**
- * Connect to TajoMaster
- *
- * @param conf TajoConf
- * @param addr TajoMaster address
- * @param baseDatabase The base database name. It is case sensitive. If it is null,
- * the 'default' database will be used.
- * @throws IOException
- */
- public TajoClient(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
- this.conf = conf;
- this.conf.set("tajo.disk.scheduler.report.interval", "0");
- this.tajoMasterAddr = addr;
- int workerNum = conf.getIntVar(TajoConf.ConfVars.RPC_CLIENT_WORKER_THREAD_NUM);
- // Don't share connection pool per client
- connPool = RpcConnectionPool.newPool(conf, getClass().getSimpleName(), workerNum);
- userInfo = UserGroupInformation.getCurrentUser();
- this.baseDatabase = baseDatabase != null ? baseDatabase : null;
- }
-
- public void setSessionId(TajoIdProtos.SessionIdProto sessionId) {
- this.sessionId = sessionId;
- }
-
- public boolean isConnected() {
- if(!closed.get()){
- try {
- return connPool.getConnection(tajoMasterAddr, TajoMasterClientProtocol.class, false).isConnected();
- } catch (Throwable e) {
- return false;
- }
- }
- return false;
- }
-
- public TajoClient(InetSocketAddress addr) throws IOException {
- this(new TajoConf(), addr, null);
- }
-
- public TajoClient(String hostname, int port, String baseDatabase) throws IOException {
- this(new TajoConf(), NetUtils.createSocketAddr(hostname, port), baseDatabase);
- }
-
- public TajoIdProtos.SessionIdProto getSessionId() {
- return sessionId;
- }
-
- private InetSocketAddress getTajoMasterAddr() {
- if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return tajoMasterAddr;
- } else {
- if (!HAServiceUtil.isMasterAlive(tajoMasterAddr, conf)) {
- return HAServiceUtil.getMasterClientAddress(conf);
- } else {
- return tajoMasterAddr;
- }
- }
- }
-
- public String getBaseDatabase() {
- return baseDatabase;
- }
-
- @Override
- public void close() {
- if(closed.getAndSet(true)){
- return;
- }
-
- // remove session
- try {
- NettyClientBase client = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
- TajoMasterClientProtocolService.BlockingInterface tajoMaster = client.getStub();
- tajoMaster.removeSession(null, sessionId);
- } catch (Throwable e) {
- }
-
- if(connPool != null) {
- connPool.shutdown();
- }
- queryMasterMap.clear();
- }
-
- public TajoConf getConf() {
- return conf;
- }
-
- public UserGroupInformation getUserInfo() {
- return userInfo;
- }
-
- /**
- * Call to QueryMaster closing query resources
- * @param queryId
- */
- public void closeQuery(final QueryId queryId) {
- if(queryMasterMap.containsKey(queryId)) {
- NettyClientBase qmClient = null;
- try {
- qmClient = connPool.getConnection(queryMasterMap.get(queryId), QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
- queryMasterService.closeQuery(null, queryId.getProto());
- } catch (Exception e) {
- LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
- } finally {
- connPool.closeConnection(qmClient);
- queryMasterMap.remove(queryId);
- }
- }
- }
-
- public void closeNonForwardQuery(final QueryId queryId) {
- NettyClientBase tmClient = null;
- try {
- tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
- checkSessionAndGet(tmClient);
-
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
-
- builder.setSessionId(getSessionId());
- builder.setQueryId(queryId.getProto());
- tajoMasterService.closeNonForwardQuery(null, builder.build());
- } catch (Exception e) {
- LOG.warn("Fail to close a TajoMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e);
- } finally {
- connPool.closeConnection(tmClient);
- }
- }
-
- private void checkSessionAndGet(NettyClientBase client) throws ServiceException {
- if (sessionId == null) {
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
- builder.setUsername(userInfo.getUserName()).build();
- if (baseDatabase != null) {
- builder.setBaseDatabaseName(baseDatabase);
- }
- CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
- if (response.getState() == CreateSessionResponse.ResultState.SUCCESS) {
- sessionId = response.getSessionId();
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Got session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
- }
- } else {
- throw new InvalidClientSessionException(response.getMessage());
- }
- }
- }
-
- private SessionedStringProto convertSessionedString(String str) {
- SessionedStringProto.Builder builder = SessionedStringProto.newBuilder();
- builder.setSessionId(sessionId);
- builder.setValue(str);
- return builder.build();
- }
-
- public String getCurrentDatabase() throws ServiceException {
- return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
- public String call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getCurrentDatabase(null, sessionId).getValue();
- }
- }.withRetries();
- }
-
- public Boolean selectDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.selectDatabase(null, convertSessionedString(databaseName)).getValue();
- }
- }.withRetries();
- }
-
- public Boolean updateSessionVariables(final Map<String, String> variables) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(variables);
- UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSetVariables(keyValueSet.getProto()).build();
-
- return tajoMasterService.updateSessionVariables(null, request).getValue();
- }
- }.withRetries();
- }
-
- public Boolean unsetSessionVariables(final List<String> variables) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- UpdateSessionVariableRequest request = UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .addAllUnsetVariables(variables).build();
- return tajoMasterService.updateSessionVariables(null, request).getValue();
- }
- }.withRetries();
- }
-
- public String getSessionVariable(final String varname) throws ServiceException {
- return new ServerCallable<String>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
- public String call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getSessionVariable(null, convertSessionedString(varname)).getValue();
- }
- }.withRetries();
- }
-
- public Boolean existSessionVariable(final String varname) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existSessionVariable(null, convertSessionedString(varname)).getValue();
- }
- }.withRetries();
- }
-
- public Map<String, String> getAllSessionVariables() throws ServiceException {
- return new ServerCallable<Map<String, String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class,
- false, true) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- KeyValueSet keyValueSet = new KeyValueSet(tajoMasterService.getAllSessionVariables(null, sessionId));
- return keyValueSet.getAllKeyValus();
- }
- }.withRetries();
- }
-
- /**
- * It submits a query statement and get a response immediately.
- * The response only contains a query id, and submission status.
- * In order to get the result, you should use {@link #getQueryResult(org.apache.tajo.QueryId)}
- * or {@link #getQueryResultAndWait(org.apache.tajo.QueryId)}.
- */
- public SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
- return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.submitQuery(null, builder.build());
- }
- }.withRetries();
- }
-
- public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<SubmitQueryResponse>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.submitQuery(null, builder.build());
- }
- }.withRetries();
- }
-
- /**
- * It submits a query statement and get a response.
- * The main difference from {@link #executeQuery(String)}
- * is a blocking method. So, this method is wait for
- * the finish of the submitted query.
- *
- * @return If failed, return null.
- */
- public ResultSet executeQueryAndGetResult(final String sql)
- throws ServiceException, IOException {
- SubmitQueryResponse response = executeQuery(sql);
-
- if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- throw new ServiceException(response.getErrorTrace());
- }
- QueryId queryId = new QueryId(response.getQueryId());
- if (response.getIsForwarded()) {
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- return this.createNullResultSet(queryId);
- } else {
- return this.getQueryResultAndWait(queryId);
- }
- } else {
- // If a non-forwarded insert into query
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID) && response.getMaxRowNum() == 0) {
- return this.createNullResultSet(queryId);
- } else {
- if (response.hasResultSet() || response.hasTableDesc()) {
- return createResultSet(this, response);
- } else {
- return this.createNullResultSet(queryId);
- }
- }
- }
- }
-
- public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException {
- SubmitQueryResponse response = executeQueryWithJson(json);
- if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- throw new ServiceException(response.getErrorTrace());
- }
- QueryId queryId = new QueryId(response.getQueryId());
- if (response.getIsForwarded()) {
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- return this.createNullResultSet(queryId);
- } else {
- return this.getQueryResultAndWait(queryId);
- }
- } else {
- if (response.hasResultSet() || response.hasTableDesc()) {
- return createResultSet(this, response);
- } else {
- return this.createNullResultSet(queryId);
- }
- }
- }
-
- public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException {
- GetQueryStatusRequest.Builder builder = GetQueryStatusRequest.newBuilder();
- builder.setQueryId(queryId.getProto());
-
- GetQueryStatusResponse res = null;
- if(queryMasterMap.containsKey(queryId)) {
- NettyClientBase qmClient = null;
- try {
- qmClient = connPool.getConnection(queryMasterMap.get(queryId),
- QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
- res = queryMasterService.getQueryStatus(null, builder.build());
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(qmClient);
- }
- } else {
- NettyClientBase tmClient = null;
- try {
- tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
-
- checkSessionAndGet(tmClient);
- builder.setSessionId(sessionId);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
- res = tajoMasterService.getQueryStatus(null, builder.build());
-
- String queryMasterHost = res.getQueryMasterHost();
- if(queryMasterHost != null && !queryMasterHost.isEmpty()) {
- NettyClientBase qmClient = null;
- try {
- InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort());
- qmClient = connPool.getConnection(
- qmAddr, QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub();
- res = queryMasterService.getQueryStatus(null, builder.build());
-
- queryMasterMap.put(queryId, qmAddr);
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(qmClient);
- }
- }
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
- }
- return new QueryStatus(res);
- }
-
- /* query submit */
- public static boolean isInPreNewState(QueryState state) {
- return state == QueryState.QUERY_NOT_ASSIGNED ||
- state == QueryState.QUERY_MASTER_INIT ||
- state == QueryState.QUERY_MASTER_LAUNCHED;
- }
-
- /* query submitted. but is not running */
- public static boolean isInInitState(QueryState state) {
- return state == QueryState.QUERY_NEW || state == QueryState.QUERY_INIT;
- }
-
- /* query started. but is not complete */
- public static boolean isInRunningState(QueryState state) {
- return isInInitState(state) || state == QueryState.QUERY_RUNNING;
- }
-
- /* query complete */
- public static boolean isInCompleteState(QueryState state) {
- return !isInPreNewState(state) && !isInRunningState(state);
- }
-
- public ResultSet getQueryResult(QueryId queryId)
- throws ServiceException, IOException {
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- return createNullResultSet(queryId);
- }
- GetQueryResultResponse response = getResultResponse(queryId);
- TableDesc tableDesc = CatalogUtil.newTableDesc(response.getTableDesc());
- conf.setVar(ConfVars.USERNAME, response.getTajoUserName());
- return new TajoResultSet(this, queryId, conf, tableDesc);
- }
-
- public static ResultSet createResultSet(TajoClient client, QueryId queryId, GetQueryResultResponse response)
- throws IOException {
- TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
- TajoConf conf = new TajoConf(client.getConf());
- conf.setVar(ConfVars.USERNAME, response.getTajoUserName());
- return new TajoResultSet(client, queryId, conf, desc);
- }
-
- public static ResultSet createResultSet(TajoClient client, SubmitQueryResponse response) throws IOException {
- if (response.hasTableDesc()) {
- // non-forward query
- // select * from table1 [limit 10]
- int fetchRowNum = client.getConf().getIntVar(ConfVars.$RESULT_SET_FETCH_ROWNUM);
- if (response.hasSessionVariables()) {
- for (KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) {
- if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) {
- fetchRowNum = Integer.parseInt(eachKeyValue.getValue());
- }
- }
- }
- TableDesc tableDesc = new TableDesc(response.getTableDesc());
- return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum);
- } else {
- // simple eval query
- // select substr('abc', 1, 2)
- SerializedResultSet serializedResultSet = response.getResultSet();
- return new TajoMemoryResultSet(
- new Schema(serializedResultSet.getSchema()),
- serializedResultSet.getSerializedTuplesList(),
- response.getMaxRowNum());
- }
- }
-
- private ResultSet getQueryResultAndWait(QueryId queryId)
- throws ServiceException, IOException {
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- return createNullResultSet(queryId);
- }
- QueryStatus status = getQueryStatus(queryId);
-
- while(status != null && !isInCompleteState(status.getState())) {
- try {
- Thread.sleep(500);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- status = getQueryStatus(queryId);
- }
-
- if (status.getState() == QueryState.QUERY_SUCCEEDED) {
- if (status.hasResult()) {
- return getQueryResult(queryId);
- } else {
- return createNullResultSet(queryId);
- }
-
- } else {
- LOG.warn("Query (" + status.getQueryId() + ") failed: " + status.getState());
-
- //TODO throw SQLException(?)
- return createNullResultSet(queryId);
- }
- }
-
- public ResultSet createNullResultSet(QueryId queryId) throws IOException {
- return new TajoResultSet(this, queryId);
- }
-
- public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException {
- if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
- return null;
- }
-
- NettyClientBase client = null;
- try {
- InetSocketAddress queryMasterAddr = queryMasterMap.get(queryId);
- if(queryMasterAddr == null) {
- LOG.warn("No Connection to QueryMaster for " + queryId);
- return null;
- }
- client = connPool.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false);
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
- GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder();
- builder.setQueryId(queryId.getProto());
- GetQueryResultResponse response = queryMasterService.getQueryResult(null,
- builder.build());
-
- return response;
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- } finally {
- connPool.releaseConnection(client);
- }
- }
-
- public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException {
- try {
- ServerCallable<SerializedResultSet> callable =
- new ServerCallable<SerializedResultSet>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
- public SerializedResultSet call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setQueryId(queryId.getProto());
- builder.setFetchRowNum(fetchRowNum);
- try {
- GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResultCode() == ResultCode.ERROR) {
- throw new ServiceException(response.getErrorTrace());
- }
-
- return response.getResultSet();
- } catch (ServiceException e) {
- abort();
- throw e;
- } catch (Throwable t) {
- throw new ServiceException(t.getMessage(), t);
- }
- }
- };
-
- SerializedResultSet serializedResultSet = callable.withRetries();
-
- return new TajoMemoryResultSet(
- new Schema(serializedResultSet.getSchema()),
- serializedResultSet.getSerializedTuplesList(),
- serializedResultSet.getSerializedTuplesCount());
- } catch (Exception e) {
- throw new ServiceException(e.getMessage(), e);
- }
- }
-
- public boolean updateQuery(final String sql) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
- return true;
- } else {
- if (response.hasErrorMessage()) {
- System.err.println("ERROR: " + response.getErrorMessage());
- }
- return false;
- }
- }
- }.withRetries();
- }
-
- public boolean updateQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
- return true;
- } else {
- if (response.hasErrorMessage()) {
- System.err.println("ERROR: " + response.getErrorMessage());
- }
- return false;
- }
- }
- }.withRetries();
- }
-
- /**
- * Create a database.
- *
- * @param databaseName The database name to be created. This name is case sensitive.
- * @return True if created successfully.
- * @throws ServiceException
- */
- public boolean createDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.createDatabase(null, convertSessionedString(databaseName)).getValue();
- }
- }.withRetries();
- }
-
- /**
- * Does the database exist?
- *
- * @param databaseName The database name to be checked. This name is case sensitive.
- * @return True if so.
- * @throws ServiceException
- */
- public boolean existDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existDatabase(null, convertSessionedString(databaseName)).getValue();
- }
- }.withRetries();
- }
-
- /**
- * Drop the database
- *
- * @param databaseName The database name to be dropped. This name is case sensitive.
- * @return True if the database is dropped successfully.
- * @throws ServiceException
- */
- public boolean dropDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.dropDatabase(null, convertSessionedString(databaseName)).getValue();
- }
- }.withRetries();
- }
-
- public List<String> getAllDatabaseNames() throws ServiceException {
- return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
- public List<String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getAllDatabases(null, sessionId).getValuesList();
- }
- }.withRetries();
- }
-
- /**
- * Does the table exist?
- *
- * @param tableName The table name to be checked. This name is case sensitive.
- * @return True if so.
- */
- public boolean existTable(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existTable(null, convertSessionedString(tableName)).getValue();
- }
- }.withRetries();
- }
-
- /**
- * Create an external table.
- *
- * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
- * If the table name is not qualified, the current database in the session will be used.
- * @param schema The schema
- * @param path The external table location
- * @param meta Table meta
- * @return the created table description.
- * @throws SQLException
- * @throws ServiceException
- */
- public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
- final TableMeta meta)
- throws SQLException, ServiceException {
- return createExternalTable(tableName, schema, path, meta, null);
- }
-
- /**
- * Create an external table.
- *
- * @param tableName The table name to be created. This name is case sensitive. This name can be qualified or not.
- * If the table name is not qualified, the current database in the session will be used.
- * @param schema The schema
- * @param path The external table location
- * @param meta Table meta
- * @param partitionMethodDesc Table partition description
- * @return the created table description.
- * @throws SQLException
- * @throws ServiceException
- */
- public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
- final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
- throws SQLException, ServiceException {
- return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- CreateTableRequest.Builder builder = CreateTableRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setName(tableName);
- builder.setSchema(schema.getProto());
- builder.setMeta(meta.getProto());
- builder.setPath(path.toUri().toString());
- if (partitionMethodDesc != null) {
- builder.setPartition(partitionMethodDesc.getProto());
- }
- TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResultCode() == ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
- }.withRetries();
- }
-
- /**
- * Drop a table
- *
- * @param tableName The table name to be dropped. This name is case sensitive.
- * @return True if the table is dropped successfully.
- */
- public boolean dropTable(final String tableName) throws ServiceException {
- return dropTable(tableName, false);
- }
-
- /**
- * Drop a table.
- *
- * @param tableName The table name to be dropped. This name is case sensitive.
- * @param purge If purge is true, this call will remove the entry in catalog as well as the table contents.
- * @return True if the table is dropped successfully.
- */
- public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
- return new ServerCallable<Boolean>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- DropTableRequest.Builder builder = DropTableRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setName(tableName);
- builder.setPurge(purge);
- return tajoMasterService.dropTable(null, builder.build()).getValue();
- }
- }.withRetries();
-
- }
-
- public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
- return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
- builder.setSessionId(sessionId);
- GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
- return res.getQueryListList();
- }
- }.withRetries();
- }
-
- public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
- return new ServerCallable<List<BriefQueryInfo>>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public List<BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetQueryListRequest.Builder builder = GetQueryListRequest.newBuilder();
- builder.setSessionId(sessionId);
- GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
- return res.getQueryListList();
- }
- }.withRetries();
- }
-
- public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
- return new ServerCallable<List<WorkerResourceInfo>>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public List<WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetClusterInfoRequest.Builder builder = GetClusterInfoRequest.newBuilder();
- builder.setSessionId(sessionId);
- GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
- return res.getWorkerListList();
- }
- }.withRetries();
- }
-
- /**
- * Get a list of table names.
- *
- * @param databaseName The database name to show all tables. This name is case sensitive.
- * If it is null, this method will show all tables
- * in the current database of this session.
- */
- public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
- return new ServerCallable<List<String>>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public List<String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetTableListRequest.Builder builder = GetTableListRequest.newBuilder();
- builder.setSessionId(sessionId);
- if (databaseName != null) {
- builder.setDatabaseName(databaseName);
- }
- GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
- return res.getTablesList();
- }
- }.withRetries();
- }
-
- /**
- * Get a table description
- *
- * @param tableName The table name to get. This name is case sensitive.
- * @return Table description
- */
- public TableDesc getTableDesc(final String tableName) throws ServiceException {
- return new ServerCallable<TableDesc>(connPool, getTajoMasterAddr(), TajoMasterClientProtocol.class, false, true) {
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetTableDescRequest.Builder builder = GetTableDescRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setTableName(tableName);
- TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResultCode() == ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
- }.withRetries();
- }
-
- public QueryStatus killQuery(final QueryId queryId)
- throws ServiceException, IOException {
-
- QueryStatus status = getQueryStatus(queryId);
-
- NettyClientBase tmClient = null;
- try {
- /* send a kill to the TM */
- tmClient = connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
-
- checkSessionAndGet(tmClient);
-
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(sessionId);
- builder.setQueryId(queryId.getProto());
- tajoMasterService.killQuery(null, builder.build());
-
- long currentTimeMillis = System.currentTimeMillis();
- long timeKillIssued = currentTimeMillis;
- while ((currentTimeMillis < timeKillIssued + 10000L)
- && ((status.getState() != QueryState.QUERY_KILLED)
- || (status.getState() == QueryState.QUERY_KILL_WAIT))) {
- try {
- Thread.sleep(100L);
- } catch(InterruptedException ie) {
- break;
- }
- currentTimeMillis = System.currentTimeMillis();
- status = getQueryStatus(queryId);
- }
-
- } catch(Exception e) {
- LOG.debug("Error when checking for application status", e);
- } finally {
- connPool.releaseConnection(tmClient);
- }
- return status;
- }
-
- public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
- return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connPool, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false, true) {
- public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- String paramFunctionName = functionName == null ? "" : functionName;
- FunctionResponse res = tajoMasterService.getFunctionList(null,convertSessionedString(paramFunctionName));
- if (res.getResultCode() == ResultCode.OK) {
- return res.getFunctionsList();
- } else {
- throw new SQLException(res.getErrorMessage());
- }
- }
- }.withRetries();
- }
+public interface TajoClient extends QueryClient, CatalogAdminClient, Closeable {
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
new file mode 100644
index 0000000..75de492
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientImpl.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.annotation.Nullable;
+import org.apache.tajo.annotation.ThreadSafe;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.partition.PartitionMethodDesc;
+import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.conf.TajoConf.ConfVars;
+import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
+import org.apache.tajo.ipc.ClientProtos.GetQueryResultResponse;
+import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;
+import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.util.NetUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+@ThreadSafe
+public class TajoClientImpl extends SessionConnection implements TajoClient, QueryClient, CatalogAdminClient {
+
+ private final Log LOG = LogFactory.getLog(TajoClientImpl.class);
+ QueryClient queryClient;
+ CatalogAdminClient catalogClient;
+
+ public TajoClientImpl(TajoConf conf) throws IOException {
+ this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), null);
+ }
+
+ public TajoClientImpl(TajoConf conf, @Nullable String baseDatabase) throws IOException {
+ this(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS)), baseDatabase);
+ }
+
+ public TajoClientImpl(InetSocketAddress addr) throws IOException {
+ this(new TajoConf(), addr, null);
+ }
+
+ /**
+ * Connect to TajoMaster
+ *
+ * @param conf TajoConf
+ * @param addr TajoMaster address
+ * @param baseDatabase The base database name. It is case sensitive. If it is null,
+ * the 'default' database will be used.
+ * @throws java.io.IOException
+ */
+ public TajoClientImpl(TajoConf conf, InetSocketAddress addr, @Nullable String baseDatabase) throws IOException {
+ super(conf, addr, baseDatabase);
+ this.queryClient = new QueryClientImpl(this);
+ this.catalogClient = new CatalogAdminClientImpl(this);
+ }
+
+ public TajoClientImpl(String hostName, int port, @Nullable String baseDatabase) throws IOException {
+ super(hostName, port, baseDatabase);
+ this.queryClient = new QueryClientImpl(this);
+ this.catalogClient = new CatalogAdminClientImpl(this);
+ }
+
+ /*------------------------------------------------------------------------*/
+ // QueryClient wrappers
+ /*------------------------------------------------------------------------*/
+
+ public void closeQuery(final QueryId queryId) {
+ queryClient.closeQuery(queryId);
+ }
+
+ public void closeNonForwardQuery(final QueryId queryId) {
+ queryClient.closeNonForwardQuery(queryId);
+ }
+
+ public SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+ return queryClient.executeQuery(sql);
+ }
+
+ public SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
+ return queryClient.executeQueryWithJson(json);
+ }
+
+ public ResultSet executeQueryAndGetResult(final String sql) throws ServiceException, IOException {
+ return queryClient.executeQueryAndGetResult(sql);
+ }
+
+ public ResultSet executeJsonQueryAndGetResult(final String json) throws ServiceException, IOException {
+ return queryClient.executeJsonQueryAndGetResult(json);
+ }
+
+ public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException {
+ return queryClient.getQueryStatus(queryId);
+ }
+
+ public ResultSet getQueryResult(QueryId queryId) throws ServiceException, IOException {
+ return queryClient.getQueryResult(queryId);
+ }
+
+ public ResultSet createNullResultSet(QueryId queryId) throws IOException {
+ return new TajoResultSet(this, queryId);
+ }
+
+ public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceException {
+ return queryClient.getResultResponse(queryId);
+ }
+
+ public TajoMemoryResultSet fetchNextQueryResult(final QueryId queryId, final int fetchRowNum) throws ServiceException {
+ return queryClient.fetchNextQueryResult(queryId, fetchRowNum);
+ }
+
+ public boolean updateQuery(final String sql) throws ServiceException {
+ return queryClient.updateQuery(sql);
+ }
+
+ public boolean updateQueryWithJson(final String json) throws ServiceException {
+ return queryClient.updateQueryWithJson(json);
+ }
+
+ public QueryStatus killQuery(final QueryId queryId) throws ServiceException, IOException {
+ return queryClient.killQuery(queryId);
+ }
+
+ public List<BriefQueryInfo> getRunningQueryList() throws ServiceException {
+ return queryClient.getRunningQueryList();
+ }
+
+ public List<BriefQueryInfo> getFinishedQueryList() throws ServiceException {
+ return queryClient.getFinishedQueryList();
+ }
+
+ public List<WorkerResourceInfo> getClusterInfo() throws ServiceException {
+ return queryClient.getClusterInfo();
+ }
+
+ /*------------------------------------------------------------------------*/
+ // CatalogClient wrappers
+ /*------------------------------------------------------------------------*/
+
+ public boolean createDatabase(final String databaseName) throws ServiceException {
+ return catalogClient.createDatabase(databaseName);
+ }
+
+ public boolean existDatabase(final String databaseName) throws ServiceException {
+ return catalogClient.existDatabase(databaseName);
+ }
+
+ public boolean dropDatabase(final String databaseName) throws ServiceException {
+ return catalogClient.dropDatabase(databaseName);
+ }
+
+ public List<String> getAllDatabaseNames() throws ServiceException {
+ return catalogClient.getAllDatabaseNames();
+ }
+
+ public boolean existTable(final String tableName) throws ServiceException {
+ return catalogClient.existTable(tableName);
+ }
+
+ public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+ final TableMeta meta) throws SQLException, ServiceException {
+ return catalogClient.createExternalTable(tableName, schema, path, meta);
+ }
+
+ public TableDesc createExternalTable(final String tableName, final Schema schema, final Path path,
+ final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
+ throws SQLException, ServiceException {
+ return catalogClient.createExternalTable(tableName, schema, path, meta, partitionMethodDesc);
+ }
+
+ public boolean dropTable(final String tableName) throws ServiceException {
+ return dropTable(tableName, false);
+ }
+
+ public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
+ return catalogClient.dropTable(tableName, purge);
+ }
+
+ public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
+ return catalogClient.getTableList(databaseName);
+ }
+
+ public TableDesc getTableDesc(final String tableName) throws ServiceException {
+ return catalogClient.getTableDesc(tableName);
+ }
+
+ public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
+ return catalogClient.getFunctions(functionName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
new file mode 100644
index 0000000..7aed335
--- /dev/null
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoClientUtil.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.client;
+
+import org.apache.tajo.QueryId;
+import org.apache.tajo.SessionVars;
+import org.apache.tajo.TajoProtos;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.jdbc.FetchResultSet;
+import org.apache.tajo.jdbc.TajoMemoryResultSet;
+import org.apache.tajo.jdbc.TajoResultSet;
+import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
+
+import java.io.IOException;
+import java.sql.ResultSet;
+
+public class TajoClientUtil {
+
+ /* query submit */
+ public static boolean isQueryWaitingForSchedule(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_NOT_ASSIGNED ||
+ state == TajoProtos.QueryState.QUERY_MASTER_INIT ||
+ state == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED;
+ }
+
+ /* query submitted. but is not running */
+ public static boolean isQueryInited(TajoProtos.QueryState state) {
+ return state == TajoProtos.QueryState.QUERY_NEW || state == TajoProtos.QueryState.QUERY_INIT;
+ }
+
+ /* query started. but is not complete */
+ public static boolean isQueryRunning(TajoProtos.QueryState state) {
+ return isQueryInited(state) || state == TajoProtos.QueryState.QUERY_RUNNING;
+ }
+
+ /* query complete */
+ public static boolean isQueryComplete(TajoProtos.QueryState state) {
+ return !isQueryWaitingForSchedule(state) && !isQueryRunning(state);
+ }
+
+ public static ResultSet createResultSet(TajoConf conf, TajoClient client, QueryId queryId,
+ ClientProtos.GetQueryResultResponse response)
+ throws IOException {
+ TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
+ conf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
+ return new TajoResultSet(client, queryId, conf, desc);
+ }
+
+ public static ResultSet createResultSet(TajoConf conf, QueryClient client, ClientProtos.SubmitQueryResponse response)
+ throws IOException {
+ if (response.hasTableDesc()) {
+ // non-forward query
+ // select * from table1 [limit 10]
+ int fetchRowNum = conf.getIntVar(TajoConf.ConfVars.$RESULT_SET_FETCH_ROWNUM);
+ if (response.hasSessionVariables()) {
+ for (PrimitiveProtos.KeyValueProto eachKeyValue: response.getSessionVariables().getKeyvalList()) {
+ if (eachKeyValue.getKey().equals(SessionVars.FETCH_ROWNUM.keyname())) {
+ fetchRowNum = Integer.parseInt(eachKeyValue.getValue());
+ }
+ }
+ }
+ TableDesc tableDesc = new TableDesc(response.getTableDesc());
+ return new FetchResultSet(client, tableDesc.getLogicalSchema(), new QueryId(response.getQueryId()), fetchRowNum);
+ } else {
+ // simple eval query
+ // select substr('abc', 1, 2)
+ ClientProtos.SerializedResultSet serializedResultSet = response.getResultSet();
+ return new TajoMemoryResultSet(
+ new Schema(serializedResultSet.getSchema()),
+ serializedResultSet.getSerializedTuplesList(),
+ response.getMaxRowNum());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
index 7628d9d..540f54b 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoDump.java
@@ -104,9 +104,9 @@ public class TajoDump {
System.exit(-1);
} else if (hostName != null && port != null) {
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName+":"+port);
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
} else {
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
}
PrintWriter writer = new PrintWriter(System.out);
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
index 2377427..88ab491 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
@@ -112,9 +112,9 @@ public class TajoGetConf {
return;
} else if (hostName != null && port != null) {
tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
- tajoClient = new TajoClient(tajoConf);
+ tajoClient = new TajoClientImpl(tajoConf);
} else if (hostName == null && port == null) {
- tajoClient = new TajoClient(tajoConf);
+ tajoClient = new TajoClientImpl(tajoConf);
}
processConfKey(writer, param);
@@ -123,13 +123,13 @@ public class TajoGetConf {
private void processConfKey(Writer writer, String param) throws ParseException, IOException,
ServiceException, SQLException {
- String value = tajoClient.getConf().getTrimmed(param);
+ String value = tajoConf.getTrimmed(param);
// If there is no value in the configuration file, we need to find all ConfVars.
if (value == null) {
for(TajoConf.ConfVars vars : TajoConf.ConfVars.values()) {
if (vars.varname.equalsIgnoreCase(param)) {
- value = tajoClient.getConf().getVar(vars);
+ value = tajoConf.getVar(vars);
break;
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
index 11cb4ed..5d5cf71 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAAdmin.java
@@ -126,9 +126,9 @@ public class TajoHAAdmin {
return;
} else if (hostName != null && port != null) {
tajoConf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS, hostName + ":" + port);
- tajoClient = new TajoClient(tajoConf);
+ tajoClient = new TajoClientImpl(tajoConf);
} else if (hostName == null && port == null) {
- tajoClient = new TajoClient(tajoConf);
+ tajoClient = new TajoClientImpl(tajoConf);
}
if (!tajoConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
@@ -159,7 +159,7 @@ public class TajoHAAdmin {
private void getState(Writer writer, String param) throws ParseException, IOException,
ServiceException {
tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
- int retValue = HAServiceUtil.getState(param, tajoClient.getConf());
+ int retValue = HAServiceUtil.getState(param, tajoConf);
switch (retValue) {
case 1:
@@ -179,7 +179,7 @@ public class TajoHAAdmin {
private void formatHA(Writer writer) throws ParseException, IOException,
ServiceException {
- int retValue = HAServiceUtil.formatHA(tajoClient.getConf());
+ int retValue = HAServiceUtil.formatHA(tajoConf);
switch (retValue) {
case 1:
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
index f22d5ba..b93590c 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoHAClientUtil.java
@@ -71,7 +71,7 @@ public class TajoHAClientUtil {
conf.setVar(TajoConf.ConfVars.TAJO_MASTER_CLIENT_RPC_ADDRESS,
HAServiceUtil.getMasterClientName(conf));
client.close();
- tajoClient = new TajoClient(conf, baseDatabase);
+ tajoClient = new TajoClientImpl(conf, baseDatabase);
if (context != null && context.getCurrentDatabase() != null) {
tajoClient.selectDatabase(context.getCurrentDatabase());
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
index 7ebce91..78674b1 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/FetchResultSet.java
@@ -20,6 +20,7 @@ package org.apache.tajo.jdbc;
import org.apache.tajo.QueryId;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.client.QueryClient;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.storage.Tuple;
@@ -27,13 +28,13 @@ import java.io.IOException;
import java.sql.SQLException;
public class FetchResultSet extends TajoResultSetBase {
- private TajoClient tajoClient;
+ private QueryClient tajoClient;
private QueryId queryId;
private int fetchRowNum;
private TajoMemoryResultSet currentResultSet;
private boolean finished = false;
- public FetchResultSet(TajoClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
+ public FetchResultSet(QueryClient tajoClient, Schema schema, QueryId queryId, int fetchRowNum) {
this.tajoClient = tajoClient;
this.queryId = queryId;
this.fetchRowNum = fetchRowNum;
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
index 65954f1..d78b04f 100644
--- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
+++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.catalog.TableDesc;
+import org.apache.tajo.client.QueryClient;
import org.apache.tajo.client.TajoClient;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.storage.FileScanner;
@@ -47,20 +48,20 @@ public class TajoResultSet extends TajoResultSetBase {
private FileSystem fs;
private Scanner scanner;
- private TajoClient tajoClient;
+ private QueryClient tajoClient;
private TajoConf conf;
private TableDesc desc;
private Long maxRowNum = null;
private QueryId queryId;
private AtomicBoolean closed = new AtomicBoolean(false);
- public TajoResultSet(TajoClient tajoClient, QueryId queryId) {
+ public TajoResultSet(QueryClient tajoClient, QueryId queryId) {
this.tajoClient = tajoClient;
this.queryId = queryId;
init();
}
- public TajoResultSet(TajoClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException {
+ public TajoResultSet(QueryClient tajoClient, QueryId queryId, TajoConf conf, TableDesc table) throws IOException {
this.tajoClient = tajoClient;
this.queryId = queryId;
this.conf = conf;
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
index 91dcea1..b1b6450 100644
--- a/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/BenchmarkSet.java
@@ -24,6 +24,7 @@ import org.apache.tajo.catalog.CatalogConstants;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.store.MemStore;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.util.FileUtil;
@@ -43,11 +44,11 @@ public abstract class BenchmarkSet {
public void init(TajoConf conf, String dataDir) throws IOException {
this.dataDir = dataDir;
if (System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname) != null) {
- tajo = new TajoClient(NetUtils.createSocketAddr(
+ tajo = new TajoClientImpl(NetUtils.createSocketAddr(
System.getProperty(ConfVars.WORKER_PEER_RPC_ADDRESS.varname)));
} else {
conf.set(CatalogConstants.STORE_CLASS, MemStore.class.getCanonicalName());
- tajo = new TajoClient(conf);
+ tajo = new TajoClientImpl(conf);
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index cd3be98..f7c7b11 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -41,7 +41,7 @@ import org.apache.tajo.catalog.exception.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.QueryClient;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.DatumFactory;
@@ -761,7 +761,7 @@ public class GlobalEngine extends AbstractService {
stats.setNumBytes(totalSize);
if (isExternal) { // if it is an external table, there is no way to know the exact row number without processing.
- stats.setNumRows(TajoClient.UNKNOWN_ROW_NUMBER);
+ stats.setNumRows(QueryClient.UNKNOWN_ROW_NUMBER);
}
TableDesc desc = new TableDesc(CatalogUtil.buildFQName(databaseName, simpleTableName),
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index 7ea2e48..7790ac6 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -9,13 +9,10 @@ import org.apache.tajo.QueryIdFactory;
import org.apache.tajo.TajoProtos;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.QueryStatus;
-import org.apache.tajo.client.TajoClient;
-import org.apache.tajo.client.TajoHAClientUtil;
+import org.apache.tajo.client.*;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.jdbc.TajoResultSet;
-import org.apache.tajo.util.HAServiceUtil;
import org.apache.tajo.util.JSPUtil;
import org.apache.tajo.util.TajoIdUtils;
import org.codehaus.jackson.map.DeserializationConfig;
@@ -67,18 +64,21 @@ public class QueryExecutorServlet extends HttpServlet {
//queryRunnerId -> QueryRunner
private final Map<String, QueryRunner> queryRunners = new HashMap<String, QueryRunner>();
+ private TajoConf tajoConf;
private TajoClient tajoClient;
private ExecutorService queryRunnerExecutor = Executors.newFixedThreadPool(5);
private QueryRunnerCleaner queryRunnerCleaner;
+
@Override
public void init(ServletConfig config) throws ServletException {
om.getDeserializationConfig().disable(
DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES);
try {
- tajoClient = new TajoClient(new TajoConf());
+ tajoConf = new TajoConf();
+ tajoClient = new TajoClientImpl(tajoConf);
queryRunnerCleaner = new QueryRunnerCleaner();
queryRunnerCleaner.start();
@@ -273,8 +273,7 @@ public class QueryExecutorServlet extends HttpServlet {
public void run() {
startTime = System.currentTimeMillis();
try {
- TajoConf conf = tajoClient.getConf();
- tajoClient = TajoHAClientUtil.getTajoClient(conf, tajoClient);
+ tajoClient = TajoHAClientUtil.getTajoClient(tajoConf, tajoClient);
response = tajoClient.executeQuery(query);
@@ -319,7 +318,7 @@ public class QueryExecutorServlet extends HttpServlet {
// non-forwarded INSERT INTO query does not have any query id.
// In this case, it just returns succeeded query information without printing the query results.
} else {
- res = TajoClient.createResultSet(tajoClient, response);
+ res = TajoClientUtil.createResultSet(tajoConf, tajoClient, response);
MakeResultText(res, desc);
}
progress.set(100);
@@ -399,8 +398,8 @@ public class QueryExecutorServlet extends HttpServlet {
try {
ClientProtos.GetQueryResultResponse response = tajoClient.getResultResponse(tajoQueryId);
TableDesc desc = CatalogUtil.newTableDesc(response.getTableDesc());
- tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
- res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc);
+ tajoConf.setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
+ res = new TajoResultSet(tajoClient, queryId, tajoConf, desc);
MakeResultText(res, desc);
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
index 805fe06..c6cac32 100644
--- a/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
+++ b/tajo-core/src/test/java/org/apache/tajo/LocalTajoTestingUtility.java
@@ -31,6 +31,7 @@ import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.planner.global.MasterPlan;
import org.apache.tajo.engine.query.QueryContext;
@@ -102,7 +103,7 @@ public class LocalTajoTestingUtility {
util = new TajoTestingCluster();
util.startMiniCluster(1);
conf = util.getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().getStorageManager().getWarehouseDir();
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
index a272b15..becb73e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
+++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java
@@ -32,6 +32,7 @@ import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.cli.ParsedResult;
import org.apache.tajo.cli.SimpleParser;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.query.QueryContext;
@@ -178,7 +179,7 @@ public class QueryTestCaseBase {
@BeforeClass
public static void setUpClass() throws IOException {
conf = testBase.getTestingCluster().getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
}
@AfterClass
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
index aec11f6..452a17e 100644
--- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
+++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java
@@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.master.TajoMaster;
@@ -583,7 +585,7 @@ public class TajoTestingCluster {
Thread.sleep(1000);
}
TajoConf conf = util.getConfiguration();
- return new TajoClient(conf);
+ return new TajoClientImpl(conf);
}
public static ResultSet run(String[] names,
@@ -620,7 +622,7 @@ public class TajoTestingCluster {
Thread.sleep(1000);
}
TajoConf conf = util.getConfiguration();
- TajoClient client = new TajoClient(conf);
+ TajoClient client = new TajoClientImpl(conf);
try {
return run(names, schemas, tableOption, tables, query, client);
@@ -645,7 +647,7 @@ public class TajoTestingCluster {
Thread.sleep(1000);
}
TajoConf conf = util.getConfiguration();
- TajoClient client = new TajoClient(conf);
+ TajoClient client = new TajoClientImpl(conf);
try {
FileSystem fs = util.getDefaultFileSystem();
Path rootDir = util.getMaster().
@@ -721,7 +723,7 @@ public class TajoTestingCluster {
QueryMasterTask qmt = null;
int i = 0;
- while (qmt == null || TajoClient.isInPreNewState(qmt.getState())) {
+ while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) {
try {
Thread.sleep(delay);
if(qmt == null){
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
index 4ede88e..719a775 100644
--- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
+++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java
@@ -62,7 +62,7 @@ public class TestTajoClient {
public static void setUp() throws Exception {
cluster = TpchTestBase.getInstance().getTestingCluster();
conf = cluster.getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
testDir = CommonTestingUtil.getTestDir();
}
@@ -658,7 +658,7 @@ public class TestTajoClient {
QueryStatus queryStatus = client.getQueryStatus(queryId);
assertNotNull(queryStatus);
- assertTrue(TajoClient.isInCompleteState(queryStatus.getState()));
+ assertTrue(TajoClientUtil.isQueryComplete(queryStatus.getState()));
TajoResultSet resultSet = (TajoResultSet) client.getQueryResult(queryId);
assertNotNull(resultSet);
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
index e477939..08535ef 100644
--- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
+++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java
@@ -25,7 +25,7 @@ import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.QueryClient;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -419,7 +419,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
assertNotNull(rsmd);
assertEquals(0, rsmd.getColumnCount());
- TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient();
+ QueryClient connTajoClient = ((JdbcConnection)stmt.getConnection()).getQueryClient();
Map<String, String> variables = connTajoClient.getAllSessionVariables();
String value = variables.get("JOIN_TASK_INPUT_SIZE");
assertNotNull(value);
@@ -461,7 +461,7 @@ public class TestTajoJdbc extends QueryTestCaseBase {
assertNotNull(rsmd);
assertEquals(0, rsmd.getColumnCount());
- TajoClient connTajoClient = ((TajoConnection)stmt.getConnection()).getTajoClient();
+ QueryClient connTajoClient = ((JdbcConnection)stmt.getConnection()).getQueryClient();
Map<String, String> variables = connTajoClient.getAllSessionVariables();
String value = variables.get("JOIN_TASK_INPUT_SIZE");
assertNotNull(value);
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
index 86d18eb..249afae 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/ha/TestHAServiceHDFSImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster;
import org.junit.Test;
@@ -55,7 +56,7 @@ public class TestHAServiceHDFSImpl {
cluster.startMiniCluster(1);
conf = cluster.getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
FileSystem fs = cluster.getDefaultFileSystem();
startBackupMasters();
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
index 0f925bb..4a6ca00 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryProgress.java
@@ -21,6 +21,8 @@ package org.apache.tajo.master.querymaster;
import org.apache.tajo.*;
import org.apache.tajo.client.QueryStatus;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos;
@@ -41,7 +43,7 @@ public class TestQueryProgress {
public static void setUp() throws Exception {
cluster = TpchTestBase.getInstance().getTestingCluster();
conf = cluster.getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
}
@AfterClass
@@ -67,7 +69,7 @@ public class TestQueryProgress {
prevProgress = progress;
assertTrue(progress <= 1.0f);
- if (TajoClient.isInCompleteState(status.getState())) break;
+ if (TajoClientUtil.isQueryComplete(status.getState())) break;
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
index 069ee27..18764c2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
+++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java
@@ -20,6 +20,8 @@ package org.apache.tajo.scheduler;
import org.apache.tajo.*;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
+import org.apache.tajo.client.TajoClientUtil;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.ipc.ClientProtos;
import org.junit.AfterClass;
@@ -41,7 +43,7 @@ public class TestFifoScheduler {
public static void setUp() throws Exception {
cluster = TpchTestBase.getInstance().getTestingCluster();
conf = cluster.getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
}
@AfterClass
@@ -75,7 +77,7 @@ public class TestFifoScheduler {
cluster.waitForQueryRunning(queryId);
assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState());
- ResultSet resSet = TajoClient.createResultSet(client, res2);
+ ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2);
assertNotNull(resSet);
client.killQuery(queryId); //cleanup
@@ -95,7 +97,7 @@ public class TestFifoScheduler {
cluster.waitForQueryRunning(queryId);
- assertTrue(TajoClient.isInRunningState(client.getQueryStatus(queryId).getState()));
+ assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState()));
assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState());
assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState());
http://git-wip-us.apache.org/repos/asf/tajo/blob/f80beaf6/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
index 3a85c14..c68d3a4 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java
@@ -25,6 +25,7 @@ import org.apache.tajo.TajoProtos;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.TpchTestBase;
import org.apache.tajo.client.TajoClient;
+import org.apache.tajo.client.TajoClientImpl;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.master.TajoMaster;
import org.apache.tajo.master.querymaster.QueryInProgress;
@@ -49,7 +50,7 @@ public class TestHistory {
cluster = TpchTestBase.getInstance().getTestingCluster();
master = cluster.getMaster();
conf = cluster.getConfiguration();
- client = new TajoClient(conf);
+ client = new TajoClientImpl(conf);
}
@After