You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/07 18:36:01 UTC
[2/6] tajo git commit: TAJO-1583: Remove ServerCallable in RPC
client. (jinho)
TAJO-1583: Remove ServerCallable in RPC client. (jinho)
Closes #556
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/47554105
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/47554105
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/47554105
Branch: refs/heads/index_support
Commit: 475541057891518e08e5a18ebbbf916c1ad60c10
Parents: 9540f16
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Apr 30 16:51:56 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Apr 30 16:51:56 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../tajo/catalog/AbstractCatalogClient.java | 569 ++++++-------------
.../org/apache/tajo/catalog/CatalogClient.java | 49 +-
.../org/apache/tajo/catalog/CatalogServer.java | 8 +-
.../tajo/catalog/LocalCatalogWrapper.java | 20 +-
.../tajo/client/CatalogAdminClientImpl.java | 236 +++-----
.../org/apache/tajo/client/QueryClientImpl.java | 328 +++++------
.../apache/tajo/client/SessionConnection.java | 275 ++++-----
.../cli/tsql/TestDefaultCliOutputFormatter.java | 4 -
.../apache/tajo/querymaster/TestKillQuery.java | 63 +-
.../org/apache/tajo/rpc/NettyClientBase.java | 3 +-
.../tajo/rpc/RetriesExhaustedException.java | 104 ----
.../org/apache/tajo/rpc/ServerCallable.java | 148 -----
.../org/apache/tajo/rpc/TestBlockingRpc.java | 39 --
14 files changed, 618 insertions(+), 1230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 8bda2bd..952f852 100644
--- a/CHANGES
+++ b/CHANGES
@@ -214,6 +214,8 @@ Release 0.11.0 - unreleased
TASKS
+ TAJO-1583: Remove ServerCallable in RPC client. (jinho)
+
TAJO-1587: Upgrade java version to 1.7 for Travis CI. (jihoon)
TAJO-1559: Fix data model description (tinyint, smallint).
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
index 49be29a..766f6c2 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java
@@ -29,16 +29,12 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.common.TajoDataTypes.DataType;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.RpcClientManager;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.exception.InvalidOperationException;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.NullProto;
-import org.apache.tajo.service.ServiceTracker;
-import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.ProtoUtil;
-import java.net.InetSocketAddress;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -46,50 +42,27 @@ import java.util.List;
/**
* CatalogClient provides a client API to access the catalog server.
*/
-public abstract class AbstractCatalogClient implements CatalogService {
- private final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
+public abstract class AbstractCatalogClient implements CatalogService, Closeable {
+ protected final Log LOG = LogFactory.getLog(AbstractCatalogClient.class);
- protected ServiceTracker serviceTracker;
- protected RpcClientManager manager;
- protected InetSocketAddress catalogServerAddr;
protected TajoConf conf;
- abstract CatalogProtocolService.BlockingInterface getStub(NettyClientBase client);
-
- public AbstractCatalogClient(TajoConf conf, InetSocketAddress catalogServerAddr) {
- this.manager = RpcClientManager.getInstance();
- this.catalogServerAddr = catalogServerAddr;
- this.serviceTracker = ServiceTrackerFactory.get(conf);
+ public AbstractCatalogClient(TajoConf conf) {
this.conf = conf;
}
- private InetSocketAddress getCatalogServerAddr() {
- if (catalogServerAddr == null) {
- return null;
- } else {
-
- if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
- return catalogServerAddr;
- } else {
- return serviceTracker.getCatalogAddress();
- }
- }
- }
+ abstract CatalogProtocolService.BlockingInterface getStub() throws ServiceException;
@Override
public final Boolean createTablespace(final String tablespaceName, final String tablespaceUri) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
+ CatalogProtocolService.BlockingInterface stub = getStub();
- CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
- builder.setTablespaceName(tablespaceName);
- builder.setTablespaceUri(tablespaceUri);
- return stub.createTablespace(null, builder.build()).getValue();
- }
- }.withRetries();
- } catch (ServiceException e) {
+ CreateTablespaceRequest.Builder builder = CreateTablespaceRequest.newBuilder();
+ builder.setTablespaceName(tablespaceName);
+ builder.setTablespaceUri(tablespaceUri);
+ return stub.createTablespace(null, builder.build()).getValue();
+ } catch (Exception e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
}
@@ -98,12 +71,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean dropTablespace(final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -113,12 +82,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean existTablespace(final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existTablespace(null, ProtoUtil.convertString(tablespaceName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -128,46 +93,32 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllTablespaceNames() {
try {
- return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<String> call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
- return ProtoUtil.convertStrings(response);
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PrimitiveProtos.StringListProto response = stub.getAllTablespaceNames(null, ProtoUtil.NULL_PROTO);
+ return ProtoUtil.convertStrings(response);
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<String>();
}
}
@Override
public List<TablespaceProto> getAllTablespaces() {
try {
- return new ServerCallable<List<TablespaceProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TablespaceProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
- return response.getTablespaceList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ CatalogProtos.GetTablespacesProto response = stub.getAllTablespaces(null, ProtoUtil.NULL_PROTO);
+ return response.getTablespaceList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TablespaceProto>();
}
}
@Override
public TablespaceProto getTablespace(final String tablespaceName) {
try {
- return new ServerCallable<TablespaceProto>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public TablespaceProto call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.getTablespace(null, ProtoUtil.convertString(tablespaceName));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -177,12 +128,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public Boolean alterTablespace(final AlterTablespaceProto alterTablespace) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.alterTablespace(null, alterTablespace).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.alterTablespace(null, alterTablespace).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -192,18 +139,14 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean createDatabase(final String databaseName, @Nullable final String tablespaceName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
+ CatalogProtocolService.BlockingInterface stub = getStub();
- CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
- builder.setDatabaseName(databaseName);
- if (tablespaceName != null) {
- builder.setTablespaceName(tablespaceName);
- }
- return stub.createDatabase(null, builder.build()).getValue();
- }
- }.withRetries();
+ CreateDatabaseRequest.Builder builder = CreateDatabaseRequest.newBuilder();
+ builder.setDatabaseName(databaseName);
+ if (tablespaceName != null) {
+ builder.setTablespaceName(tablespaceName);
+ }
+ return stub.createDatabase(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -213,12 +156,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean dropDatabase(final String databaseName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -228,12 +167,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Boolean existDatabase(final String databaseName) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existDatabase(null, ProtoUtil.convertString(databaseName)).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return Boolean.FALSE;
@@ -243,50 +178,36 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final Collection<String> getAllDatabaseNames() {
try {
- return new ServerCallable<Collection<String>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<String> call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
- return ProtoUtil.convertStrings(response);
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PrimitiveProtos.StringListProto response = stub.getAllDatabaseNames(null, ProtoUtil.NULL_PROTO);
+ return ProtoUtil.convertStrings(response);
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<String>();
}
}
@Override
public List<DatabaseProto> getAllDatabases() {
try {
- return new ServerCallable<List<DatabaseProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<DatabaseProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
- return response.getDatabaseList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetDatabasesProto response = stub.getAllDatabases(null, ProtoUtil.NULL_PROTO);
+ return response.getDatabaseList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<DatabaseProto>();
}
}
@Override
public final TableDesc getTableDesc(final String databaseName, final String tableName) {
try {
- return new ServerCallable<TableDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public TableDesc call(NettyClientBase client) throws ServiceException {
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return CatalogUtil.newTableDesc(stub.getTableDesc(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -302,89 +223,60 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public List<TableDescriptorProto> getAllTables() {
try {
- return new ServerCallable<List<TableDescriptorProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TableDescriptorProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
- return response.getTableList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTablesProto response = stub.getAllTables(null, ProtoUtil.NULL_PROTO);
+ return response.getTableList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TableDescriptorProto>();
}
}
@Override
public List<TableOptionProto> getAllTableOptions() {
try {
- return new ServerCallable<List<TableOptionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TableOptionProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
- return response.getTableOptionList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTableOptionsProto response = stub.getAllTableOptions(null, ProtoUtil.NULL_PROTO);
+ return response.getTableOptionList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TableOptionProto>();
}
}
@Override
public List<TableStatsProto> getAllTableStats() {
try {
- return new ServerCallable<List<TableStatsProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TableStatsProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
- return response.getStatList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTableStatsProto response = stub.getAllTableStats(null, ProtoUtil.NULL_PROTO);
+ return response.getStatList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TableStatsProto>();
}
}
@Override
public List<ColumnProto> getAllColumns() {
try {
- return new ServerCallable<List<ColumnProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<ColumnProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
- return response.getColumnList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetColumnsProto response = stub.getAllColumns(null, ProtoUtil.NULL_PROTO);
+ return response.getColumnList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<ColumnProto>();
}
}
@Override
public final PartitionMethodDesc getPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<PartitionMethodDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public PartitionMethodDesc call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return CatalogUtil.newPartitionMethodDesc(stub.getPartitionMethodByTableName(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -394,17 +286,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean existPartitionMethod(final String databaseName, final String tableName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existPartitionMethod(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existPartitionMethod(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -415,18 +302,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
public final PartitionDescProto getPartition(final String databaseName, final String tableName,
final String partitionName) {
try {
- return new ServerCallable<PartitionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public PartitionDescProto call(NettyClientBase client) throws ServiceException {
-
- PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
- builder.setPartitionName(partitionName);
+ PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
+ builder.setPartitionName(partitionName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.getPartitionByPartitionName(null, builder.build());
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.getPartitionByPartitionName(null, builder.build());
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -436,94 +318,70 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final List<PartitionDescProto> getPartitions(final String databaseName, final String tableName) {
try {
- return new ServerCallable<List<PartitionDescProto>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class,
- false) {
- public List<PartitionDescProto> call(NettyClientBase client) throws ServiceException {
+ PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
- return response.getPartitionList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PartitionsProto response = stub.getPartitionsByTableName(null, builder.build());
+ return response.getPartitionList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<PartitionDescProto>();
}
}
@Override
public List<TablePartitionProto> getAllPartitions() {
try {
- return new ServerCallable<List<TablePartitionProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<TablePartitionProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
- return response.getPartList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetTablePartitionsProto response = stub.getAllPartitions(null, ProtoUtil.NULL_PROTO);
+ return response.getPartList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<TablePartitionProto>();
}
}
@Override
public final Collection<String> getAllTableNames(final String databaseName) {
try {
- return new ServerCallable<Collection<String>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<String> call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
- return ProtoUtil.convertStrings(response);
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ PrimitiveProtos.StringListProto response = stub.getAllTableNames(null, ProtoUtil.convertString(databaseName));
+ return ProtoUtil.convertStrings(response);
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<String>();
}
}
@Override
public final Collection<FunctionDesc> getFunctions() {
- try {
- return new ServerCallable<Collection<FunctionDesc>>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Collection<FunctionDesc> call(NettyClientBase client) throws ServiceException {
- List<FunctionDesc> list = new ArrayList<FunctionDesc>();
- GetFunctionsResponse response;
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- response = stub.getFunctions(null, NullProto.newBuilder().build());
- int size = response.getFunctionDescCount();
- for (int i = 0; i < size; i++) {
- try {
- list.add(new FunctionDesc(response.getFunctionDesc(i)));
- } catch (ClassNotFoundException e) {
- LOG.error(e, e);
- return null;
- }
- }
+ List<FunctionDesc> list = new ArrayList<FunctionDesc>();
+ try {
+ GetFunctionsResponse response;
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ response = stub.getFunctions(null, NullProto.newBuilder().build());
+ int size = response.getFunctionDescCount();
+ for (int i = 0; i < size; i++) {
+ try {
+ list.add(new FunctionDesc(response.getFunctionDesc(i)));
+ } catch (ClassNotFoundException e) {
+ LOG.error(e, e);
return list;
}
- }.withRetries();
+ }
+ return list;
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return list;
}
}
@Override
public final boolean createTable(final TableDesc desc) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.createTable(null, desc.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.createTable(null, desc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -537,17 +395,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
final String simpleName = splitted[1];
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(simpleName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(simpleName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropTable(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropTable(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -561,17 +414,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
"tableName cannot be composed of multiple parts, but it is \"" + tableName + "\"");
}
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setTableName(tableName);
+ TableIdentifierProto.Builder builder = TableIdentifierProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setTableName(tableName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existsTable(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existsTable(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -586,12 +434,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean createIndex(final IndexDesc index) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.createIndex(null, index.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.createIndex(null, index.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -601,16 +445,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean existIndexByName(final String databaseName, final String indexName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- IndexNameProto.Builder builder = IndexNameProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setIndexName(indexName);
+ IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setIndexName(indexName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByName(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existIndexByName(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -620,17 +460,13 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public boolean existIndexByColumn(final String databaseName, final String tableName, final String columnName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
+ GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+ builder.setColumnName(columnName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.existIndexByColumn(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.existIndexByColumn(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -640,17 +476,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final IndexDesc getIndexByName(final String databaseName, final String indexName) {
try {
- return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public IndexDesc call(NettyClientBase client) throws ServiceException {
-
- IndexNameProto.Builder builder = IndexNameProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setIndexName(indexName);
+ IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setIndexName(indexName);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByName(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return new IndexDesc(stub.getIndexByName(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -662,17 +493,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
final String tableName,
final String columnName) {
try {
- return new ServerCallable<IndexDesc>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public IndexDesc call(NettyClientBase client) throws ServiceException {
+ GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
+ builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
+ builder.setColumnName(columnName);
- GetIndexByColumnRequest.Builder builder = GetIndexByColumnRequest.newBuilder();
- builder.setTableIdentifier(CatalogUtil.buildTableIdentifier(databaseName, tableName));
- builder.setColumnName(columnName);
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return new IndexDesc(stub.getIndexByColumn(null, builder.build()));
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return null;
@@ -683,17 +509,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
public boolean dropIndex(final String databaseName,
final String indexName) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
+ IndexNameProto.Builder builder = IndexNameProto.newBuilder();
+ builder.setDatabaseName(databaseName);
+ builder.setIndexName(indexName);
- IndexNameProto.Builder builder = IndexNameProto.newBuilder();
- builder.setDatabaseName(databaseName);
- builder.setIndexName(indexName);
-
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropIndex(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropIndex(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -703,30 +524,20 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public List<IndexProto> getAllIndexes() {
try {
- return new ServerCallable<List<IndexProto>>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
-
- @Override
- public List<IndexProto> call(NettyClientBase client) throws Exception {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
- return response.getIndexList();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ GetIndexesProto response = stub.getAllIndexes(null, ProtoUtil.NULL_PROTO);
+ return response.getIndexList();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return null;
+ return new ArrayList<IndexProto>();
}
}
@Override
public final boolean createFunction(final FunctionDesc funcDesc) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.createFunction(null, funcDesc.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.createFunction(null, funcDesc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -736,15 +547,11 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public final boolean dropFunction(final String signature) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
- builder.setSignature(signature);
+ UnregisterFunctionRequest.Builder builder = UnregisterFunctionRequest.newBuilder();
+ builder.setSignature(signature);
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.dropFunction(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.dropFunction(null, builder.build()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -769,24 +576,12 @@ public abstract class AbstractCatalogClient implements CatalogService {
FunctionDescProto descProto = null;
try {
- descProto = new ServerCallable<FunctionDescProto>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public FunctionDescProto call(NettyClientBase client) throws ServiceException {
- try {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.getFunctionMeta(null, builder.build());
- } catch (NoSuchFunctionException e) {
- abort();
- throw e;
- }
- }
- }.withRetries();
- } catch(ServiceException e) {
- // this is not good. we need to define user massage exception
- if(e.getCause() instanceof NoSuchFunctionException){
- LOG.debug(e.getMessage());
- } else {
- LOG.error(e.getMessage(), e);
- }
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ descProto = stub.getFunctionMeta(null, builder.build());
+ } catch (NoSuchFunctionException e) {
+ LOG.debug(e.getMessage());
+ } catch (ServiceException e) {
+ LOG.error(e.getMessage(), e);
}
if (descProto == null) {
@@ -819,27 +614,21 @@ public abstract class AbstractCatalogClient implements CatalogService {
}
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.containFunction(null, builder.build()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.containFunction(null, builder.build()).getValue();
+ } catch (InvalidOperationException e) {
+ LOG.error(e.getMessage());
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
- return false;
}
+ return false;
}
@Override
public final boolean alterTable(final AlterTableDesc desc) {
try {
- return new ServerCallable<Boolean>(this.manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.alterTable(null, desc.getProto()).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.alterTable(null, desc.getProto()).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
@@ -849,12 +638,8 @@ public abstract class AbstractCatalogClient implements CatalogService {
@Override
public boolean updateTableStats(final UpdateTableStatsProto updateTableStatsProto) {
try {
- return new ServerCallable<Boolean>(manager, getCatalogServerAddr(), CatalogProtocol.class, false) {
- public Boolean call(NettyClientBase client) throws ServiceException {
- CatalogProtocolService.BlockingInterface stub = getStub(client);
- return stub.updateTableStats(null, updateTableStatsProto).getValue();
- }
- }.withRetries();
+ CatalogProtocolService.BlockingInterface stub = getStub();
+ return stub.updateTableStats(null, updateTableStatsProto).getValue();
} catch (ServiceException e) {
LOG.error(e.getMessage(), e);
return false;
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
index 7666a97..80ded4a 100644
--- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
+++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/CatalogClient.java
@@ -18,35 +18,72 @@
package org.apache.tajo.catalog;
+import com.google.protobuf.ServiceException;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService.BlockingInterface;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.conf.TajoConf.ConfVars;
import org.apache.tajo.rpc.NettyClientBase;
+import org.apache.tajo.rpc.RpcClientManager;
+import org.apache.tajo.rpc.RpcConstants;
+import org.apache.tajo.service.ServiceTracker;
+import org.apache.tajo.service.ServiceTrackerFactory;
import org.apache.tajo.util.NetUtils;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
/**
* CatalogClient provides a client API to access the catalog server.
*/
public class CatalogClient extends AbstractCatalogClient {
+ protected NettyClientBase client;
+ protected ServiceTracker serviceTracker;
+ protected InetSocketAddress catalogServerAddr;
/**
* @throws java.io.IOException
*
*/
public CatalogClient(final TajoConf conf) throws IOException {
- super(conf, NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS)));
+ super(conf);
+ this.catalogServerAddr = NetUtils.createSocketAddr(conf.getVar(ConfVars.CATALOG_ADDRESS));
+ this.serviceTracker = ServiceTrackerFactory.get(conf);
}
- public CatalogClient(TajoConf conf, String host, int port) throws IOException {
- super(conf, NetUtils.createSocketAddr(host, port));
- }
@Override
- BlockingInterface getStub(NettyClientBase client) {
- return client.getStub();
+ BlockingInterface getStub() throws ServiceException {
+ return getCatalogConnection().getStub();
+ }
+
+ private InetSocketAddress getCatalogServerAddr() {
+ if (!conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) {
+ return catalogServerAddr;
+ } else {
+ return serviceTracker.getCatalogAddress();
+ }
}
+ public synchronized NettyClientBase getCatalogConnection() throws ServiceException {
+ if (client != null && client.isConnected()) return client;
+ else {
+ try {
+ if (client != null && client.isConnected()) return client;
+ RpcClientManager.cleanup(client);
+
+ int retry = conf.getInt(RpcConstants.RPC_CLIENT_RETRY_MAX, RpcConstants.DEFAULT_RPC_RETRIES);
+ // Client do not closed on idle state for support high available
+ this.client = RpcClientManager.getInstance().newClient(getCatalogServerAddr(), CatalogProtocol.class, false,
+ retry, 0, TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ return client;
+ }
+ }
+
+ @Override
public void close() {
+ RpcClientManager.cleanup(client);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
index e9fb177..f2e9795 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java
@@ -33,7 +33,6 @@ import org.apache.tajo.annotation.ThreadSafe;
import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService;
import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary;
import org.apache.tajo.catalog.exception.*;
-import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.*;
import org.apache.tajo.catalog.store.CatalogStore;
import org.apache.tajo.catalog.store.DerbyStore;
@@ -61,7 +60,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.tajo.catalog.proto.CatalogProtos.AlterTablespaceProto.AlterTablespaceCommand;
import static org.apache.tajo.catalog.proto.CatalogProtos.FunctionType.*;
import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListProto;
-import static org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto;
/**
* This class provides the catalog service. The catalog service enables clients
@@ -1192,7 +1190,7 @@ public class CatalogServer extends AbstractService {
if (functions.containsKey(funcDesc.getSignature())) {
FunctionDescProto found = findFunctionStrictType(funcDesc, true);
if (found != null) {
- throw new AlreadyExistsFunctionException(signature.toString());
+ throw new ServiceException(new AlreadyExistsFunctionException(signature.toString()));
}
}
@@ -1209,7 +1207,7 @@ public class CatalogServer extends AbstractService {
throws ServiceException {
if (!containFunction(request.getSignature())) {
- throw new NoSuchFunctionException(request.getSignature(), new DataType[] {});
+ throw new ServiceException(new NoSuchFunctionException(request.getSignature(), new DataType[]{}));
}
functions.remove(request.getSignature());
@@ -1231,7 +1229,7 @@ public class CatalogServer extends AbstractService {
}
if (function == null) {
- throw new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList());
+ throw new ServiceException(new NoSuchFunctionException(request.getSignature(), request.getParameterTypesList()));
} else {
return function;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
index df9bd2c..35e9e2e 100644
--- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
+++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/LocalCatalogWrapper.java
@@ -22,9 +22,6 @@
package org.apache.tajo.catalog;
import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.rpc.NettyClientBase;
-
-import java.io.IOException;
/**
* This class provides a catalog service interface in
@@ -34,20 +31,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
private CatalogServer catalog;
private CatalogProtocol.CatalogProtocolService.BlockingInterface stub;
- public LocalCatalogWrapper(final TajoConf conf) throws IOException {
- super(conf, null);
- this.catalog = new CatalogServer();
- this.catalog.init(conf);
- this.catalog.start();
- this.stub = catalog.getHandler();
- }
-
public LocalCatalogWrapper(final CatalogServer server) {
this(server, server.getConf());
}
public LocalCatalogWrapper(final CatalogServer server, final TajoConf conf) {
- super(conf, null);
+ super(conf);
this.catalog = server;
this.stub = server.getHandler();
}
@@ -57,7 +46,12 @@ public class LocalCatalogWrapper extends AbstractCatalogClient {
}
@Override
- CatalogProtocol.CatalogProtocolService.BlockingInterface getStub(NettyClientBase client) {
+ CatalogProtocol.CatalogProtocolService.BlockingInterface getStub() {
return stub;
}
+
+ @Override
+ public void close() {
+ //nothing to do
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
index 9d0e427..9397fcf 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/CatalogAdminClientImpl.java
@@ -27,10 +27,8 @@ import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.ipc.ClientProtos;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.SQLStates;
import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
import java.io.IOException;
import java.net.URI;
@@ -48,79 +46,45 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
@Override
public boolean createDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMaster = client.getStub();
- return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMaster = client.getStub();
+ return tajoMaster.createDatabase(null, connection.convertSessionedString(databaseName)).getValue();
}
@Override
public boolean existDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMaster = client.getStub();
- return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMaster = client.getStub();
+ return tajoMaster.existDatabase(null, connection.convertSessionedString(databaseName)).getValue();
}
@Override
public boolean dropDatabase(final String databaseName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.dropDatabase(null, connection.convertSessionedString(databaseName)).getValue();
}
@Override
public List<String> getAllDatabaseNames() throws ServiceException {
- return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public List<String> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.getAllDatabases(null, connection.sessionId).getValuesList();
}
public boolean existTable(final String tableName) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+ return tajoMasterService.existTable(null, connection.convertSessionedString(tableName)).getValue();
}
@Override
@@ -133,32 +97,25 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
final TableMeta meta, final PartitionMethodDesc partitionMethodDesc)
throws SQLException, ServiceException {
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setName(tableName);
- builder.setSchema(schema.getProto());
- builder.setMeta(meta.getProto());
- builder.setPath(path.toString());
- if (partitionMethodDesc != null) {
- builder.setPartition(partitionMethodDesc.getProto());
- }
- ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.CreateTableRequest.Builder builder = ClientProtos.CreateTableRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setName(tableName);
+ builder.setSchema(schema.getProto());
+ builder.setMeta(meta.getProto());
+ builder.setPath(path.toString());
+ if (partitionMethodDesc != null) {
+ builder.setPartition(partitionMethodDesc.getProto());
+ }
+ ClientProtos.TableResponse res = tajoMasterService.createExternalTable(null, builder.build());
+ if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
+ }
}
@Override
@@ -169,94 +126,67 @@ public class CatalogAdminClientImpl implements CatalogAdminClient {
@Override
public boolean dropTable(final String tableName, final boolean purge) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setName(tableName);
- builder.setPurge(purge);
- return tajoMasterService.dropTable(null, builder.build()).getValue();
- }
-
- }.withRetries();
+ ClientProtos.DropTableRequest.Builder builder = ClientProtos.DropTableRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setName(tableName);
+ builder.setPurge(purge);
+ return tajoMasterService.dropTable(null, builder.build()).getValue();
}
@Override
public List<String> getTableList(@Nullable final String databaseName) throws ServiceException {
- return new ServerCallable<List<String>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public List<String> call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- if (databaseName != null) {
- builder.setDatabaseName(databaseName);
- }
- ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
- return res.getTablesList();
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.GetTableListRequest.Builder builder = ClientProtos.GetTableListRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ if (databaseName != null) {
+ builder.setDatabaseName(databaseName);
+ }
+ ClientProtos.GetTableListResponse res = tajoMasterService.getTableList(null, builder.build());
+ return res.getTablesList();
}
@Override
public TableDesc getTableDesc(final String tableName) throws ServiceException {
- return new ServerCallable<TableDesc>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public TableDesc call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setTableName(tableName);
- ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
- return CatalogUtil.newTableDesc(res.getTableDesc());
- } else {
- throw new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ ClientProtos.GetTableDescRequest.Builder builder = ClientProtos.GetTableDescRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setTableName(tableName);
+ ClientProtos.TableResponse res = tajoMasterService.getTableDesc(null, builder.build());
+ if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+ return CatalogUtil.newTableDesc(res.getTableDesc());
+ } else {
+ throw new ServiceException(new SQLException(res.getErrorMessage(), SQLStates.ER_NO_SUCH_TABLE.getState()));
+ }
}
@Override
public List<CatalogProtos.FunctionDescProto> getFunctions(final String functionName) throws ServiceException {
- return new ServerCallable<List<CatalogProtos.FunctionDescProto>>(connection.manager,
- connection.getTajoMasterAddr(), TajoMasterClientProtocol.class, false) {
-
- public List<CatalogProtos.FunctionDescProto> call(NettyClientBase client) throws ServiceException, SQLException {
-
- connection.checkSessionAndGet(client);
- BlockingInterface tajoMasterService = client.getStub();
-
- String paramFunctionName = functionName == null ? "" : functionName;
- ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
- connection.convertSessionedString(paramFunctionName));
- if (res.getResultCode() == ClientProtos.ResultCode.OK) {
- return res.getFunctionsList();
- } else {
- throw new SQLException(res.getErrorMessage());
- }
- }
-
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ BlockingInterface tajoMasterService = client.getStub();
+
+ String paramFunctionName = functionName == null ? "" : functionName;
+ ClientProtos.FunctionResponse res = tajoMasterService.getFunctionList(null,
+ connection.convertSessionedString(paramFunctionName));
+ if (res.getResultCode() == ClientProtos.ResultCode.OK) {
+ return res.getFunctionsList();
+ } else {
+ throw new ServiceException(new SQLException(res.getErrorMessage()));
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/47554105/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
index 99c58b6..53889fe 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java
@@ -28,11 +28,10 @@ import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.ipc.QueryMasterClientProtocol;
-import org.apache.tajo.ipc.TajoMasterClientProtocol;
import org.apache.tajo.jdbc.FetchResultSet;
import org.apache.tajo.jdbc.TajoMemoryResultSet;
import org.apache.tajo.rpc.NettyClientBase;
-import org.apache.tajo.rpc.ServerCallable;
+import org.apache.tajo.rpc.RpcClientManager;
import org.apache.tajo.util.ProtoUtil;
import java.io.IOException;
@@ -40,6 +39,7 @@ import java.net.InetSocketAddress;
import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.apache.tajo.ipc.ClientProtos.*;
import static org.apache.tajo.ipc.QueryMasterClientProtocol.QueryMasterClientProtocolService;
@@ -102,7 +102,7 @@ public class QueryClientImpl implements QueryClient {
public void closeNonForwardQuery(QueryId queryId) {
NettyClientBase tmClient = null;
try {
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
TajoMasterClientProtocolService.BlockingInterface tajoMaster = tmClient.getStub();
connection.checkSessionAndGet(tmClient);
@@ -153,50 +153,37 @@ public class QueryClientImpl implements QueryClient {
@Override
public ClientProtos.SubmitQueryResponse executeQuery(final String sql) throws ServiceException {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
- return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ final QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
-
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
-
- SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
- if (response.getResultCode() == ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- }
- return response;
- }
- }.withRetries();
+ SubmitQueryResponse response = tajoMasterService.submitQuery(null, builder.build());
+ if (response.getResultCode() == ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ }
+ return response;
}
@Override
public ClientProtos.SubmitQueryResponse executeQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<ClientProtos.SubmitQueryResponse>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SubmitQueryResponse call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
- final QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ final QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(json);
+ builder.setIsJson(true);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- return tajoMasterService.submitQuery(null, builder.build());
- }
- }.withRetries();
+ return tajoMasterService.submitQuery(null, builder.build());
}
@Override
@@ -308,7 +295,7 @@ public class QueryClientImpl implements QueryClient {
NettyClientBase tmClient = null;
try {
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
connection.checkSessionAndGet(tmClient);
builder.setSessionId(connection.sessionId);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
@@ -348,7 +335,7 @@ public class QueryClientImpl implements QueryClient {
try {
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
connection.checkSessionAndGet(tmClient);
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
@@ -369,42 +356,26 @@ public class QueryClientImpl implements QueryClient {
throws ServiceException {
try {
- final ServerCallable<ClientProtos.SerializedResultSet> callable =
- new ServerCallable<ClientProtos.SerializedResultSet>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public ClientProtos.SerializedResultSet call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
- builder.setFetchRowNum(fetchRowNum);
- try {
- GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
- abort();
- throw new ServiceException(response.getErrorMessage());
- }
-
- return response.getResultSet();
- } catch (ServiceException e) {
- abort();
- throw e;
- } catch (Throwable t) {
- throw new ServiceException(t.getMessage(), t);
- }
- }
- };
-
- ClientProtos.SerializedResultSet serializedResultSet = callable.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ GetQueryResultDataRequest.Builder builder = GetQueryResultDataRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+ builder.setFetchRowNum(fetchRowNum);
+
+ GetQueryResultDataResponse response = tajoMasterService.getQueryResultData(null, builder.build());
+ if (response.getResultCode() == ClientProtos.ResultCode.ERROR) {
+ throw new ServiceException(response.getErrorMessage());
+ }
+
+ ClientProtos.SerializedResultSet resultSet = response.getResultSet();
return new TajoMemoryResultSet(queryId,
- new Schema(serializedResultSet.getSchema()),
- serializedResultSet.getSerializedTuplesList(),
- serializedResultSet.getSerializedTuplesCount(),
+ new Schema(resultSet.getSchema()),
+ resultSet.getSerializedTuplesList(),
+ resultSet.getSerializedTuplesCount(),
getClientSideSessionVars());
} catch (ServiceException e) {
throw e;
@@ -416,119 +387,86 @@ public class QueryClientImpl implements QueryClient {
@Override
public boolean updateQuery(final String sql) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public Boolean call(NettyClientBase client) throws ServiceException {
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(sql);
+ builder.setIsJson(false);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(sql);
- builder.setIsJson(false);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
-
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
- connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
- return true;
- } else {
- if (response.hasErrorMessage()) {
- System.err.println("ERROR: " + response.getErrorMessage());
- }
- return false;
- }
+ if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+ connection.updateSessionVarsCache(ProtoUtil.convertToMap(response.getSessionVars()));
+ return true;
+ } else {
+ if (response.hasErrorMessage()) {
+ LOG.error("ERROR: " + response.getErrorMessage());
}
- }.withRetries();
+ return false;
+ }
}
@Override
public boolean updateQueryWithJson(final String json) throws ServiceException {
- return new ServerCallable<Boolean>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
-
- public Boolean call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- QueryRequest.Builder builder = QueryRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQuery(json);
- builder.setIsJson(true);
- ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
- if (response.getResultCode() == ClientProtos.ResultCode.OK) {
- return true;
- } else {
- if (response.hasErrorMessage()) {
- System.err.println("ERROR: " + response.getErrorMessage());
- }
- return false;
- }
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+
+ QueryRequest.Builder builder = QueryRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQuery(json);
+ builder.setIsJson(true);
+ ClientProtos.UpdateQueryResponse response = tajoMasterService.updateQuery(null, builder.build());
+ if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+ return true;
+ } else {
+ if (response.hasErrorMessage()) {
+ LOG.error("ERROR: " + response.getErrorMessage());
}
- }.withRetries();
+ return false;
+ }
}
@Override
public List<ClientProtos.BriefQueryInfo> getRunningQueryList() throws ServiceException {
- return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
- return res.getQueryListList();
-
- }
- }.withRetries();
+ ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ ClientProtos.GetQueryListResponse res = tajoMasterService.getRunningQueryList(null, builder.build());
+ return res.getQueryListList();
}
@Override
public List<ClientProtos.BriefQueryInfo> getFinishedQueryList() throws ServiceException {
- return new ServerCallable<List<ClientProtos.BriefQueryInfo>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public List<ClientProtos.BriefQueryInfo> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
- return res.getQueryListList();
-
- }
- }.withRetries();
+ ClientProtos.GetQueryListRequest.Builder builder = ClientProtos.GetQueryListRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ ClientProtos.GetQueryListResponse res = tajoMasterService.getFinishedQueryList(null, builder.build());
+ return res.getQueryListList();
}
@Override
public List<ClientProtos.WorkerResourceInfo> getClusterInfo() throws ServiceException {
- return new ServerCallable<List<ClientProtos.WorkerResourceInfo>>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- public List<ClientProtos.WorkerResourceInfo> call(NettyClientBase client) throws ServiceException {
-
- connection.checkSessionAndGet(client);
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
-
- ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
- return res.getWorkerListList();
- }
-
- }.withRetries();
+ ClientProtos.GetClusterInfoRequest.Builder builder = ClientProtos.GetClusterInfoRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ ClientProtos.GetClusterInfoResponse res = tajoMasterService.getClusterInfo(null, builder.build());
+ return res.getWorkerListList();
}
@Override
@@ -540,7 +478,7 @@ public class QueryClientImpl implements QueryClient {
NettyClientBase tmClient = null;
try {
/* send a kill to the TM */
- tmClient = connection.getTajoMasterConnection(false);
+ tmClient = connection.getTajoMasterConnection();
TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub();
connection.checkSessionAndGet(tmClient);
@@ -581,25 +519,20 @@ public class QueryClientImpl implements QueryClient {
}
public QueryInfoProto getQueryInfo(final QueryId queryId) throws ServiceException {
- return new ServerCallable<QueryInfoProto>(connection.manager, connection.getTajoMasterAddr(),
- TajoMasterClientProtocol.class, false) {
- public QueryInfoProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
-
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
-
- TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
- GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
- if (res.getResultCode() == ResultCode.OK) {
- return res.getQueryInfo();
- } else {
- abort();
- throw new ServiceException(res.getErrorMessage());
- }
- }
- }.withRetries();
+ NettyClientBase client = connection.getTajoMasterConnection();
+ connection.checkSessionAndGet(client);
+
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+
+ TajoMasterClientProtocolService.BlockingInterface tajoMasterService = client.getStub();
+ GetQueryInfoResponse res = tajoMasterService.getQueryInfo(null,builder.build());
+ if (res.getResultCode() == ResultCode.OK) {
+ return res.getQueryInfo();
+ } else {
+ throw new ServiceException(res.getErrorMessage());
+ }
}
public QueryHistoryProto getQueryHistory(final QueryId queryId) throws ServiceException {
@@ -611,24 +544,31 @@ public class QueryClientImpl implements QueryClient {
InetSocketAddress qmAddress = new InetSocketAddress(
queryInfo.getHostNameOfQM(), queryInfo.getQueryMasterClientPort());
- return new ServerCallable<QueryHistoryProto>(connection.manager, qmAddress,
- QueryMasterClientProtocol.class, false) {
- public QueryHistoryProto call(NettyClientBase client) throws ServiceException {
- connection.checkSessionAndGet(client);
+ RpcClientManager manager = RpcClientManager.getInstance();
+ NettyClientBase queryMasterClient;
+ try {
+ queryMasterClient = manager.newClient(qmAddress, QueryMasterClientProtocol.class, false,
+ manager.getRetries(), manager.getTimeoutSeconds(), TimeUnit.SECONDS, false);
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
- QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
- builder.setSessionId(connection.sessionId);
- builder.setQueryId(queryId.getProto());
+ try {
+ connection.checkSessionAndGet(connection.getTajoMasterConnection());
- QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub();
- GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null,builder.build());
- if (res.getResultCode() == ResultCode.OK) {
- return res.getQueryHistory();
- } else {
- abort();
- throw new ServiceException(res.getErrorMessage());
- }
+ QueryIdRequest.Builder builder = QueryIdRequest.newBuilder();
+ builder.setSessionId(connection.sessionId);
+ builder.setQueryId(queryId.getProto());
+
+ QueryMasterClientProtocolService.BlockingInterface queryMasterService = queryMasterClient.getStub();
+ GetQueryHistoryResponse res = queryMasterService.getQueryHistory(null, builder.build());
+ if (res.getResultCode() == ResultCode.OK) {
+ return res.getQueryHistory();
+ } else {
+ throw new ServiceException(res.getErrorMessage());
}
- }.withRetries();
+ } finally {
+ queryMasterClient.close();
+ }
}
}