You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/07 18:36:05 UTC
[6/6] tajo git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/tajo into index_support
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/tajo into index_support
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/42bcf2de
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/42bcf2de
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/42bcf2de
Branch: refs/heads/index_support
Commit: 42bcf2de090bf1bb5b5ec711427654056a2866e2
Parents: 86c97b2 9b3824b
Author: Jihoon Son <ji...@apache.org>
Authored: Fri May 8 01:35:19 2015 +0900
Committer: Jihoon Son <ji...@apache.org>
Committed: Fri May 8 01:35:19 2015 +0900
----------------------------------------------------------------------
CHANGES | 10 +
.../tajo/catalog/AbstractCatalogClient.java | 638 +++++++------------
.../org/apache/tajo/catalog/CatalogClient.java | 49 +-
.../java/org/apache/tajo/catalog/Schema.java | 12 +-
.../org/apache/tajo/catalog/CatalogServer.java | 8 +-
.../tajo/catalog/LocalCatalogWrapper.java | 20 +-
.../tajo/client/CatalogAdminClientImpl.java | 518 ++++++++-------
.../org/apache/tajo/client/QueryClientImpl.java | 469 ++++++++------
.../apache/tajo/client/SessionConnection.java | 351 +++++-----
.../java/org/apache/tajo/conf/TajoConf.java | 2 -
.../org/apache/tajo/master/QueryInProgress.java | 31 +-
.../querymaster/QueryMasterManagerService.java | 135 ++--
.../tajo/worker/ExecutionBlockContext.java | 32 +-
.../java/org/apache/tajo/worker/TajoWorker.java | 1 +
.../tajo/worker/TajoWorkerManagerService.java | 2 +
.../main/java/org/apache/tajo/worker/Task.java | 4 +-
.../java/org/apache/tajo/worker/TaskRunner.java | 43 +-
.../src/main/proto/QueryMasterProtocol.proto | 14 +-
.../cli/tsql/TestDefaultCliOutputFormatter.java | 4 -
.../tajo/engine/planner/TestLogicalPlanner.java | 23 +-
.../tajo/engine/query/TestInsertQuery.java | 19 +
.../apache/tajo/querymaster/TestKillQuery.java | 63 +-
.../TestInsertQuery/nation_diff_col_order.ddl | 1 +
.../testInsertWithDifferentColumnOrder.sql | 1 +
.../testInsertWithDifferentColumnOrder.result | 27 +
.../org/apache/tajo/plan/LogicalPlanner.java | 9 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 10 +-
.../tajo/rpc/RetriesExhaustedException.java | 104 ---
.../org/apache/tajo/rpc/RpcClientManager.java | 9 +
.../org/apache/tajo/rpc/ServerCallable.java | 148 -----
.../org/apache/tajo/rpc/TestBlockingRpc.java | 39 --
.../org/apache/tajo/storage/rcfile/RCFile.java | 14 +-
.../tajo/storage/text/CSVLineDeserializer.java | 18 +-
.../org/apache/tajo/storage/TestStorages.java | 59 ++
34 files changed, 1365 insertions(+), 1522 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --cc tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index b52b2f5,766f6c2..c872f8b
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@@ -29,17 -29,12 +29,13 @@@ import org.apache.tajo.catalog.proto.Ca
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
- import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.RpcClientManager;
- import org.apache.tajo.rpc.ServerCallable;
+ import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
- import org.apache.tajo.service.ServiceTracker;
- import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
+import org.apache.tajo.util.TUtil;
- import java.net.InetSocketAddress;
+ import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@@ -373,37 -269,14 +270,26 @@@ public abstract class AbstractCatalogCl
}
@Override
+ public List<IndexDescProto> getAllIndexes() {
+ try {
- return new ServerCallable<List<IndexDescProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<IndexDescProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- }
- }.withRetries();
++ CatalogProtocolService.BlockingInterface stub = getStub();
++ GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++ return response.getIndexList();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@@ -637,42 -458,15 +471,42 @@@
}
@Override
- public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
+ public boolean existIndexByColumns(final String databaseName, final String tableName, final Column [] columns) {
+ return existIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
+ }
+
+ @Override
+ public boolean existIndexByColumnNames(final String databaseName, final String tableName, final String [] columnNames) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- for (String colunName : columnNames) {
- builder.addColumnNames(colunName);
- }
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++//<<<<<<< HEAD
++ GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
++ for (String colunName : columnNames) {
++ builder.addColumnNames(colunName);
++ }
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByColumnNames(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
- return stub.existIndexByColumn(null, builder.build()).getValue();
++ return stub.existIndexByColumnNames(null, builder.build()).getValue();
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return false;
+ }
+ }
+
+ @Override
+ public boolean existIndexesByTable(final String databaseName, final String tableName) {
+ try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
- }
- }.withRetries();
++ CatalogProtocolService.BlockingInterface stub = getStub();
++ return stub.existIndexesByTable(null, CatalogUtil.buildTableIdentifier(databaseName, tableName)).getValue();
++//=======
++// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++// builder.setColumnName(columnName);
++//
++// CatalogProtocolService.BlockingInterface stub = getStub();
++// return stub.existIndexByColumn(null, builder.build()).getValue();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@@ -699,62 -488,17 +528,61 @@@
}
}
+ private static String[] extractColumnNames(Column[] columns) {
+ String[] columnNames = new String [columns.length];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNames[i] = columns[i].getSimpleName();
+ }
+ return columnNames;
+ }
+
+ @Override
+ public final IndexDesc getIndexByColumns(final String databaseName,
+ final String tableName,
+ final Column [] columns) {
+ return getIndexByColumnNames(databaseName, tableName, extractColumnNames(columns));
+ }
+
@Override
- public final IndexDesc getIndexByColumn(final String databaseName,
- final String tableName,
- final String columnName) {
+ public final IndexDesc getIndexByColumnNames(final String databaseName,
+ final String tableName,
+ final String [] columnNames) {
try {
- return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public IndexDesc call(NettyClientBase client) throws ServiceException {
-
- GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- for (String columnName : columnNames) {
- builder.addColumnNames(columnName);
- }
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++// GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
++// builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
++// builder.setColumnName(columnName);
++//
++//<<<<<<< HEAD
++ GetIndexByColumnNamesRequest.Builder builder = GetIndexByColumnNamesRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
++ for (String columnName : columnNames) {
++ builder.addColumnNames(columnName);
++ }
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
- return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++ return new IndexDesc(stub.getIndexByColumnNames(null, builder.build()));
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
+ return null;
+ }
+ }
+
+ @Override
+ public final Collection<IndexDesc> getAllIndexesByTable(final String databaseName,
+ final String tableName) {
+ try {
- return new ServerCallable<Collection<IndexDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- @Override
- public Collection<IndexDesc> call(NettyClientBase client) throws Exception {
- TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
- List<IndexDesc> indexDescs = TUtil.newList();
- for (IndexDescProto descProto : response.getIndexDescList()) {
- indexDescs.add(new IndexDesc(descProto));
- }
- return indexDescs;
- }
- }.withRetries();
++ TableIdentifierProto proto = CatalogUtil.buildTableIdentifier(databaseName, tableName);
++ CatalogProtocolService.BlockingInterface stub = getStub();
++ GetAllIndexesResponse response = stub.getAllIndexesByTable(null, proto);
++ List<IndexDesc> indexDescs = TUtil.newList();
++ for (IndexDescProto descProto : response.getIndexDescList()) {
++ indexDescs.add(new IndexDesc(descProto));
++ }
++ return indexDescs;
++//=======
++// CatalogProtocolService.BlockingInterface stub = getStub();
++// return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@@ -781,6 -520,18 +604,21 @@@
return false;
}
}
-
- @Override
- public List<IndexProto> getAllIndexes() {
- try {
- CatalogProtocolService.BlockingInterface stub = getStub();
- GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- } catch (ServiceException e) {
- LOG.error(e.getMessage(), e);
- return new ArrayList<IndexProto>();
- }
- }
++//<<<<<<< HEAD
++//=======
++//
++// @Override
++// public List<IndexProto> getAllIndexes() {
++// try {
++// CatalogProtocolService.BlockingInterface stub = getStub();
++// GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
++// return response.getIndexList();
++// } catch (ServiceException e) {
++// LOG.error(e.getMessage(), e);
++// return new ArrayList<IndexProto>();
++// }
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
@Override
public final boolean createFunction(final FunctionDesc funcDesc) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 5fa1c67,9397fcf..5a04892
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@@ -20,16 -20,15 +20,15 @@@ package org.apache.tajo.client
import com.google.protobuf.ServiceException;
import org.apache.tajo.annotation.Nullable;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableDesc;
-import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.*;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
+import org.apache.tajo.catalog.proto.CatalogProtos.IndexDescProto;
import org.apache.tajo.ipc.ClientProtos;
+import org.apache.tajo.ipc.ClientProtos.*;
+import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.rpc.NettyClientBase;
- import org.apache.tajo.rpc.ServerCallable;
import java.io.IOException;
import java.net.URI;
@@@ -132,32 -97,25 +97,57 @@@ public class CatalogAdminClientImpl imp
final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
throws SQLException, ServiceException {
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setName(tableName);
- builder.setSchema(schema.getProto());
- builder.setMeta(meta.getProto());
- builder.setPath(path.toString());
- if (partitionMethodDesc != null) {
- builder.setPartition(partitionMethodDesc.getProto());
- }
- ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setName(tableName);
+ builder.setSchema(schema.getProto());
+ builder.setMeta(meta.getProto());
+ builder.setPath(path.toString());
+ if (partitionMethodDesc != null) {
+ builder.setPartition(partitionMethodDesc.getProto());
+ }
+ ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++ throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+ }
++
++//<<<<<<< HEAD
++// return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++// }
++//
++// }.withRetries();
++//=======
++// NettyClientBase client = connection.getTajoMasterConnection();
++// connection.checkSessionAndGet(client);
++// BlockingInterface tajoMasterService = client.getStub();
++//
++// ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setName(tableName);
++// builder.setSchema(schema.getProto());
++// builder.setMeta(meta.getProto());
++// builder.setPath(path.toString());
++// if (partitionMethodDesc != null) {
++// builder.setPartition(partitionMethodDesc.getProto());
++// }
++// ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
++// if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++// return CatalogUtil.newTableDesc(res.getTableDesc());
++// } else {
++// throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
@Override
@@@ -211,169 -156,37 +188,222 @@@
@Override
public TableDesc getTableDesc(final String tableName) throws ServiceException {
-
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getResult().getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
--
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setTableName(tableName);
+ ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
- throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++ throw new ServiceException(new SQLException(res.getResult().getErrorMessage(),
++ SQLStates.ER_NO_SUCH_TABLE.getState()));
+ }
++
++//<<<<<<< HEAD
++// return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++// }
++//
++// }.withRetries();
++//=======
++// NettyClientBase client = connection.getTajoMasterConnection();
++// connection.checkSessionAndGet(client);
++// BlockingInterface tajoMasterService = client.getStub();
++//
++// ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setTableName(tableName);
++// ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
++// if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++// return CatalogUtil.newTableDesc(res.getTableDesc());
++// } else {
++// throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
@Override
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
-
- return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- String paramFunctionName = functionName == null ? "" : functionName;
- ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
- connection.convertSessionedString(paramFunctionName));
- if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return res.getFunctionsList();
- } else {
- throw new SQLException(res.getResult().getErrorMessage());
- }
- }
--
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ String paramFunctionName = functionName == null ? "" : functionName;
+ ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+ connection.convertSessionedString(paramFunctionName));
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (res.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return res.getFunctionsList();
+ } else {
- throw new ServiceException(new SQLException(res.getErrorMessage()));
++ throw new ServiceException(res.getResult().getErrorMessage());
++ }
++
++//<<<<<<< HEAD
++// return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
++//
++//
++// }
++//
++// }.withRetries();
++//=======
++// NettyClientBase client = connection.getTajoMasterConnection();
++// connection.checkSessionAndGet(client);
++// BlockingInterface tajoMasterService = client.getStub();
++//
++// String paramFunctionName = functionName == null ? "" : functionName;
++// ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
++// connection.convertSessionedString(paramFunctionName));
++// if (res.getResultCode() == ClientProtos.ResultCode.OK) {
++// return res.getFunctionsList();
++// } else {
++// throw new ServiceException(new SQLException(res.getErrorMessage()));
++// }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String indexName) throws ServiceException {
- return new ServerCallable<IndexDescProto>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public IndexDescProto call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getIndexWithName(null,
- connection.convertSessionedString(indexName));
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.getIndexWithName(null,
++ connection.convertSessionedString(indexName));
+ }
+
+ @Override
+ public boolean existIndex(final String indexName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existIndexWithName(null,
- connection.convertSessionedString(indexName)).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.existIndexWithName(null,
++ connection.convertSessionedString(indexName)).getValue();
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public List<IndexDescProto> getIndexes(final String tableName) throws ServiceException {
- return new ServerCallable<List<IndexDescProto>>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public List<IndexDescProto> call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
- connection.convertSessionedString(tableName));
- if (response.getResult().getResultCode() == ResultCode.OK) {
- return response.getIndexesList();
- } else {
- throw new SQLException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ GetIndexesResponse response = tajoMasterService.getIndexesForTable(null,
++ connection.convertSessionedString(tableName));
++ if (response.getResult().getResultCode() == ResultCode.OK) {
++ return response.getIndexesList();
++ } else {
++ throw new ServiceException(response.getResult().getErrorMessage());
++ }
++// return new ServerCallable<List<IndexDescProto>>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public List<IndexDescProto> call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public boolean hasIndexes(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existIndexesForTable(null,
- connection.convertSessionedString(tableName)).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.existIndexesForTable(null,
++ connection.convertSessionedString(tableName)).getValue();
++
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public IndexDescProto getIndex(final String tableName, final String[] columnNames) throws ServiceException {
- return new ServerCallable<IndexDescProto>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public IndexDescProto call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- for (String eachColumnName : columnNames) {
- builder.addColumnNames(eachColumnName);
- }
- GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
- if (response.getResult().getResultCode() == ResultCode.OK) {
- return response.getIndexDesc();
- } else {
- throw new SQLException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++ builder.setSessionId(connection.sessionId);
++ builder.setTableName(tableName);
++ for (String eachColumnName : columnNames) {
++ builder.addColumnNames(eachColumnName);
++ }
++ GetIndexWithColumnsResponse response = tajoMasterService.getIndexWithColumns(null, builder.build());
++ if (response.getResult().getResultCode() == ResultCode.OK) {
++ return response.getIndexDesc();
++ } else {
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
++
++// return new ServerCallable<IndexDescProto>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public IndexDescProto call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public boolean existIndex(final String tableName, final String[] columnName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- for (String eachColumnName : columnName) {
- builder.addColumnNames(eachColumnName);
- }
- return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ GetIndexWithColumnsRequest.Builder builder = GetIndexWithColumnsRequest.newBuilder();
++ builder.setSessionId(connection.sessionId);
++ builder.setTableName(tableName);
++ for (String eachColumnName : columnName) {
++ builder.addColumnNames(eachColumnName);
++ }
++ return tajoMasterService.existIndexWithColumns(null, builder.build()).getValue();
++
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
+ }
+
+ @Override
+ public boolean dropIndex(final String indexName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- @Override
- public Boolean call(NettyClientBase client) throws Exception {
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.dropIndex(null,
- connection.convertSessionedString(indexName)).getValue();
- }
- }.withRetries();
++ NettyClientBase client = connection.getTajoMasterConnection();
++ connection.checkSessionAndGet(client);
++ BlockingInterface tajoMasterService = client.getStub();
++ return tajoMasterService.dropIndex(null,
++ connection.convertSessionedString(indexName)).getValue();
++
++// return new ServerCallable<Boolean>(connection.manager,
++// connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
++//
++// @Override
++// public Boolean call(NettyClientBase client) throws Exception {
++//
++// }
++// }.withRetries();
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 73abc4c,53889fe..007c010
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@@ -153,28 -153,21 +153,40 @@@ public class QueryClientImpl implement
@Override
public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
-
- return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
-
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
- SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
- if (response.getResult().getResultCode() == ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- }
- return response;
- }
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+
+ final QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
+ SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
++ if (response.getResult().getResultCode() == ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ }
+ return response;
++
++//<<<<<<< HEAD
++// connection.checkSessionAndGet(client);
++//
++// final QueryRequest.Builder builder = QueryRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQuery(sql);
++// builder.setIsJson(false);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++//
++//
++// }
++// }.withRetries();
++//=======
++// SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
++// if (response.getResultCode() == ResultCode.OK) {
++// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// }
++// return response;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
@Override
@@@ -369,42 -356,26 +375,60 @@@
throws ServiceException {
try {
- final ServerCallable<ClientProtos.SerializedResultSet> callable =
- new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
- builder.setFetchRowNum(fetchRowNum);
- try {
- GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
- abort();
- throw new ServiceException(response.getResult().getErrorMessage());
- }
-
- return response.getResultSet();
- } catch (ServiceException e) {
- abort();
- throw e;
- } catch (Throwable t) {
- throw new ServiceException(t.getMessage(), t);
- }
- }
- };
-
- ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//<<<<<<< HEAD
++// final ServerCallable<ClientProtos.SerializedResultSet> callable =
++// new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
++//
++// connection.checkSessionAndGet(client);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++// GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQueryId(queryId.getProto());
++// builder.setFetchRowNum(fetchRowNum);
++// try {
++// GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++// abort();
++// throw new ServiceException(response.getResult().getErrorMessage());
++// }
++//
++// return response.getResultSet();
++// } catch (ServiceException e) {
++// abort();
++// throw e;
++// } catch (Throwable t) {
++// throw new ServiceException(t.getMessage(), t);
++// }
++// }
++// };
++//
++// ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
++//=======
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+ builder.setFetchRowNum(fetchRowNum);
+
+ GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- throw new ServiceException(response.getErrorMessage());
++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.ERROR) {
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
+
+ ClientProtos.SerializedResultSet resultSet = response.getResultSet();
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
return new TajoMemoryResultSet(queryId,
- new Schema(serializedResultSet.getSchema()),
- serializedResultSet.getSerializedTuplesList(),
- serializedResultSet.getSerializedTuplesCount(),
+ new Schema(resultSet.getSchema()),
+ resultSet.getSerializedTuplesList(),
+ resultSet.getSerializedTuplesCount(),
getClientSideSessionVars());
} catch (ServiceException e) {
throw e;
@@@ -416,59 -387,47 +440,92 @@@
@Override
public boolean updateQuery(final String sql) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-
- if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return true;
- } else {
- if (response.getResult().hasErrorMessage()) {
- System.err.println("ERROR: " + response.getResult().getErrorMessage());
- }
- return false;
- }
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++//<<<<<<< HEAD
++// connection.checkSessionAndGet(client);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++// QueryRequest.Builder builder = QueryRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQuery(sql);
++// builder.setIsJson(false);
++// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++//
++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++// connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// return true;
++// } else {
++// if (response.getResult().hasErrorMessage()) {
++// System.err.println("ERROR: " + response.getResult().getErrorMessage());
++// }
++// return false;
++// }
++//=======
++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return true;
+ } else {
- if (response.hasErrorMessage()) {
- LOG.error("ERROR: " + response.getErrorMessage());
++ if (response.getResult().hasErrorMessage()) {
++ LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ return false;
+ }
}
@Override
public boolean updateQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
- return true;
- } else {
- if (response.getResult().hasErrorMessage()) {
- System.err.println("ERROR: " + response.getResult().getErrorMessage());
- }
- return false;
- }
++//<<<<<<< HEAD
++// return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public Boolean call(NettyClientBase client) throws ServiceException {
++//
++// connection.checkSessionAndGet(client);
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++//
++// QueryRequest.Builder builder = QueryRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQuery(json);
++// builder.setIsJson(true);
++// ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
++// if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
++// return true;
++// } else {
++// if (response.getResult().hasErrorMessage()) {
++// System.err.println("ERROR: " + response.getResult().getErrorMessage());
++// }
++// return false;
++// }
++//=======
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(json);
+ builder.setIsJson(true);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
++ if (response.getResult().getResultCode() == ClientProtos.ResultCode.OK) {
+ return true;
+ } else {
- if (response.hasErrorMessage()) {
- LOG.error("ERROR: " + response.getErrorMessage());
++ if (response.getResult().hasErrorMessage()) {
++ LOG.error("ERROR: " + response.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ return false;
+ }
}
@Override
@@@ -581,25 -519,20 +617,42 @@@
}
public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
- return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
- public QueryInfoProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
-
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
- if (res.getResult().getResultCode() == ResultCode.OK) {
- return res.getQueryInfo();
- } else {
- abort();
- throw new ServiceException(res.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++//<<<<<<< HEAD
++// return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++// public QueryInfoProto call(NettyClientBase client) throws ServiceException {
++// connection.checkSessionAndGet(client);
++//
++// QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
++// builder.setSessionId(connection.sessionId);
++// builder.setQueryId(queryId.getProto());
++//
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++// GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
++// if (res.getResult().getResultCode() == ResultCode.OK) {
++// return res.getQueryInfo();
++// } else {
++// abort();
++// throw new ServiceException(res.getResult().getErrorMessage());
++// }
++// }
++// }.withRetries();
++//=======
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
- if (res.getResultCode() == ResultCode.OK) {
++ if (res.getResult().getResultCode() == ResultCode.OK) {
+ return res.getQueryInfo();
+ } else {
- throw new ServiceException(res.getErrorMessage());
++ throw new ServiceException(res.getResult().getErrorMessage());
+ }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@@ -611,24 -544,31 +664,42 @@@
InetSocketAddress qmAddress = new InetSocketAddress(
queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
- return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
- QueryMasterClientProtocol.class, false) {
- public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
+ RpcClientManager manager = RpcClientManager.getInstance();
+ NettyClientBase queryMasterClient;
+ try {
+ queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+ manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
+ try {
+ connection.checkSessionAndGet(connection.getTajoMasterConnection());
+
++//<<<<<<< HEAD
++// QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
++// GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
++// if (res.getResult().getResultCode() == ResultCode.OK) {
++// return res.getQueryHistory();
++// } else {
++// abort();
++// throw new ServiceException(res.getResult().getErrorMessage());
++// }
++//=======
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
- GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
- if (res.getResult().getResultCode() == ResultCode.OK) {
- return res.getQueryHistory();
- } else {
- abort();
- throw new ServiceException(res.getResult().getErrorMessage());
- }
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+ GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
- if (res.getResultCode() == ResultCode.OK) {
++ if (res.getResult().getResultCode() == ResultCode.OK) {
+ return res.getQueryHistory();
+ } else {
- throw new ServiceException(res.getErrorMessage());
++ throw new ServiceException(res.getResult().getErrorMessage());
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ } finally {
+ queryMasterClient.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
----------------------------------------------------------------------
diff --cc tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
index bb15981,84decd5..1bb0e16
--- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java
@@@ -157,52 -160,43 +160,81 @@@ public class SessionConnection implemen
}
public Map<String, String> updateSessionVariables(final Map<String, String> variables) throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(variables);
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSessionVars(keyValueSet.getProto()).build();
-
- SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
- if (response.getResult().getResultCode() == ResultCode.OK) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw new ServiceException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
++//<<<<<<< HEAD
++// return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(),
++// TajoMasterClientProtocol.class, false) {
++//
++// public Map<String, String> call(NettyClientBase client) throws ServiceException {
++// checkSessionAndGet(client);
++//
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++// KeyValueSet keyValueSet = new KeyValueSet();
++// keyValueSet.putAll(variables);
++// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++// .setSessionId(sessionId)
++// .setSessionVars(keyValueSet.getProto()).build();
++//
++// SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
++//
++// if (response.getResult().getResultCode() == ResultCode.OK) {
++// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// return Collections.unmodifiableMap(sessionVarsCache);
++// } else {
++// throw new ServiceException(response.getResult().getErrorMessage());
++// }
++// }
++// }.withRetries();
++// }
++//=======
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
+
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(variables);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
+
+ SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
- if (response.getResultCode() == ResultCode.OK) {
++ if (response.getResult().getResultCode() == ResultCode.OK) {
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
+ } else {
- throw new ServiceException(response.getMessage());
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
}
- public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
- return new ServerCallable<Map<String, String>>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public Map<String, String> call(NettyClientBase client) throws ServiceException {
- checkSessionAndGet(client);
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .addAllUnsetVariables(variables).build();
-
- SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
-
- if (response.getResult().getResultCode() == ResultCode.OK) {
- updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return Collections.unmodifiableMap(sessionVarsCache);
- } else {
- throw new ServiceException(response.getResult().getErrorMessage());
- }
- }
- }.withRetries();
+ public Map<String, String> unsetSessionVariables(final List<String> variables) throws ServiceException {
+ NettyClientBase client = getTajoMasterConnection();
+ checkSessionAndGet(client);
+
++//<<<<<<< HEAD
++// if (response.getResult().getResultCode() == ResultCode.OK) {
++// updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
++// return Collections.unmodifiableMap(sessionVarsCache);
++// } else {
++// throw new ServiceException(response.getResult().getErrorMessage());
++// }
++// }
++// }.withRetries();
++//=======
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .addAllUnsetVariables(variables).build();
+
+ SessionUpdateResponse response = tajoMasterService.updateSessionVariables(null, request);
+
- if (response.getResultCode() == ResultCode.OK) {
++ if (response.getResult().getResultCode() == ResultCode.OK) {
+ updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return Collections.unmodifiableMap(sessionVarsCache);
+ } else {
- throw new ServiceException(response.getMessage());
++ throw new ServiceException(response.getResult().getErrorMessage());
+ }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
void updateSessionVarsCache(Map<String, String> variables) {
@@@ -333,55 -308,51 +346,81 @@@
}
public boolean reconnect() throws Exception {
- return new ServerCallable<Boolean>(manager, getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
- builder.setUsername(userInfo.getUserName()).build();
- if (baseDatabase != null) {
- builder.setBaseDatabaseName(baseDatabase);
- }
-
+ CreateSessionRequest.Builder builder = CreateSessionRequest.newBuilder();
+ builder.setUsername(userInfo.getUserName()).build();
+ if (baseDatabase != null) {
+ builder.setBaseDatabaseName(baseDatabase);
+ }
- // create new session
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
- if (response.getResult().getResultCode() != ResultCode.OK) {
- return false;
- }
+ NettyClientBase client = getTajoMasterConnection();
+
++//<<<<<<< HEAD
++// // create new session
++// TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
++// CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
++// if (response.getResult().getResultCode() != ResultCode.OK) {
++// return false;
++// }
++//=======
+ // create new session
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ CreateSessionResponse response = tajoMasterService.createSession(null, builder.build());
- if (response.getResultCode() != ResultCode.OK) {
++ if (response.getResult().getResultCode() != ResultCode.OK) {
+ return false;
+ }
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
- // Invalidate some session variables in client cache
- sessionId = response.getSessionId();
- Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
- synchronized (sessionVarsCache) {
- for (SessionVars var : UPDATE_ON_RECONNECT) {
- String value = sessionVars.get(var.keyname());
- if (value != null) {
- sessionVarsCache.put(var.keyname(), value);
- }
- }
+ // Invalidate some session variables in client cache
+ sessionId = response.getSessionId();
+ Map<String, String> sessionVars = ProtoUtil.convertToMap(response.getSessionVars());
+ synchronized (sessionVarsCache) {
+ for (SessionVars var : UPDATE_ON_RECONNECT) {
+ String value = sessionVars.get(var.keyname());
+ if (value != null) {
+ sessionVarsCache.put(var.keyname(), value);
}
+ }
+ }
- // Update the session variables in server side
- try {
- KeyValueSet keyValueSet = new KeyValueSet();
- keyValueSet.putAll(sessionVarsCache);
- ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
- .setSessionId(sessionId)
- .setSessionVars(keyValueSet.getProto()).build();
-
- if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
- tajoMasterService.removeSession(null, sessionId);
- return false;
- }
- LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
- return true;
- } catch (ServiceException e) {
- tajoMasterService.removeSession(null, sessionId);
- return false;
- }
++//<<<<<<< HEAD
++// // Update the session variables in server side
++// try {
++// KeyValueSet keyValueSet = new KeyValueSet();
++// keyValueSet.putAll(sessionVarsCache);
++// ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
++// .setSessionId(sessionId)
++// .setSessionVars(keyValueSet.getProto()).build();
++//
++// if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
++// tajoMasterService.removeSession(null, sessionId);
++// return false;
++// }
++// LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
++// return true;
++// } catch (ServiceException e) {
++// tajoMasterService.removeSession(null, sessionId);
++// return false;
++// }
++//=======
+ // Update the session variables in server side
+ try {
+ KeyValueSet keyValueSet = new KeyValueSet();
+ keyValueSet.putAll(sessionVarsCache);
+ ClientProtos.UpdateSessionVariableRequest request = ClientProtos.UpdateSessionVariableRequest.newBuilder()
+ .setSessionId(sessionId)
+ .setSessionVars(keyValueSet.getProto()).build();
+
- if (tajoMasterService.updateSessionVariables(null, request).getResultCode() != ResultCode.OK) {
++ if (tajoMasterService.updateSessionVariables(null, request).getResult().getResultCode() != ResultCode.OK) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
++//>>>>>>> 9b3824b5f0c64af42bfcf0a6bb8d3555c22c5746
}
- }.withRetries();
+ LOG.info(String.format("Reconnected to session %s as a user '%s'.", sessionId.getId(), userInfo.getUserName()));
+ return true;
+ } catch (ServiceException e) {
+ tajoMasterService.removeSession(null, sessionId);
+ return false;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
----------------------------------------------------------------------
diff --cc tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
index be1cdf2,b1a27fa..7ad658f
--- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
+++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java
@@@ -157,24 -153,55 +153,55 @@@ public class TestKillQuery
@Test
public final void testIgnoreStageStateFromKilled() throws Exception {
- ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr);
- QueryId queryId = new QueryId(res.getQueryId());
- cluster.waitForQuerySubmitted(queryId);
+ SQLAnalyzer analyzer = new SQLAnalyzer();
+ QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
+ Session session = LocalTajoTestingUtility.createDummySession();
+ CatalogService catalog = cluster.getMaster().getCatalog();
+
+ LogicalPlanner planner = new LogicalPlanner(catalog);
- LogicalOptimizer optimizer = new LogicalOptimizer(conf);
++ LogicalOptimizer optimizer = new LogicalOptimizer(conf, catalog);
+ Expr expr = analyzer.parse(queryStr);
+ LogicalPlan plan = planner.createPlan(defaultContext, expr);
+
+ optimizer.optimize(plan);
+
+ QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0);
+ QueryContext queryContext = new QueryContext(conf);
+ MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan);
+ GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog);
+ globalPlanner.build(masterPlan);
- QueryMasterTask qmt = cluster.getQueryMasterTask(queryId);
- Query query = qmt.getQuery();
+ CountDownLatch barrier = new CountDownLatch(1);
+ MockAsyncDispatch dispatch = new MockAsyncDispatch(barrier, TajoProtos.QueryState.QUERY_RUNNING);
+
+ QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster();
+ QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(),
+ queryId, session, defaultContext, expr.toJson(), dispatch);
- // wait for a stage created
- cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_RUNNING, 10);
- query.handle(new QueryEvent(queryId, QueryEventType.KILL));
+ queryMasterTask.init(conf);
+ queryMasterTask.getQueryTaskContext().getDispatcher().start();
+ queryMasterTask.startQuery();
try{
- cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50);
- } finally {
- assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState());
+ barrier.await(5000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ fail("Query state : " + queryMasterTask.getQuery().getSynchronizedState());
+ }
+
+ Stage stage = queryMasterTask.getQuery().getStages().iterator().next();
+ assertNotNull(stage);
+
+ // fire kill event
+ queryMasterTask.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.KILL));
+
+ try {
+ cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50);
+ assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState());
+ } finally {
+ queryMasterTask.stop();
}
- List<Stage> stages = Lists.newArrayList(query.getStages());
+ List<Stage> stages = Lists.newArrayList(queryMasterTask.getQuery().getStages());
Stage lastStage = stages.get(stages.size() - 1);
assertEquals(StageState.KILLED, lastStage.getSynchronizedState());
http://git-wip-us.apache.org/repos/asf/tajo/blob/42bcf2de/tajo-plan/src/main/java/org/apache/tajo/plan/LogicalPlanner.java
----------------------------------------------------------------------