You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/13 08:18:26 UTC
[iotdb] branch master updated: [IOTDB-2871] Data node client to connect with config node (#5488)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 47e2af285a [IOTDB-2871] Data node client to connect with config node (#5488)
47e2af285a is described below
commit 47e2af285a41bf47a72e16cb91eab3f68b840795
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Wed Apr 13 16:18:21 2022 +0800
[IOTDB-2871] Data node client to connect with config node (#5488)
---
.../iotdb/confignode/manager/ConfigManager.java | 10 +-
.../iotdb/confignode/manager/ConsensusManager.java | 4 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 309 +++++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 27 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 +
.../java/org/apache/iotdb/db/service/DataNode.java | 65 ++---
6 files changed, 369 insertions(+), 54 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 1a4fbf07f8..54b0e294d5 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.manager;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.partition.SeriesPartitionSlot;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationDataSet;
import org.apache.iotdb.confignode.consensus.response.DataNodesInfoDataSet;
@@ -37,6 +36,7 @@ import org.apache.iotdb.confignode.physical.sys.QueryDataNodeInfoPlan;
import org.apache.iotdb.confignode.physical.sys.RegisterDataNodePlan;
import org.apache.iotdb.confignode.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.mpp.common.schematree.PathPatternTree;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -250,14 +250,16 @@ public class ConfigManager implements Manager {
if (getConsensusManager().isLeader()) {
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} else {
- Endpoint endpoint = getConsensusManager().getLeader();
- if (endpoint == null) {
+ Peer peer = getConsensusManager().getLeader();
+ if (peer == null) {
return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
.setMessage(
"The current ConfigNode is not leader. And ConfigNodeGroup is in leader election. Please redirect with a random ConfigNode.");
} else {
+ // TODO Get rpc port of leader
return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode())
- .setRedirectNode(new EndPoint(endpoint.getIp(), endpoint.getPort()))
+ .setRedirectNode(
+ new EndPoint(peer.getEndpoint().getIp(), peer.getEndpoint().getPort() - 1))
.setMessage("The current ConfigNode is not leader. Please redirect.");
}
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
index e264556ddd..09d1223df7 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConsensusManager.java
@@ -101,8 +101,8 @@ public class ConsensusManager {
return consensusImpl.isLeader(consensusGroupId);
}
- public Endpoint getLeader() {
- return consensusImpl.getLeader(consensusGroupId).getEndpoint();
+ public Peer getLeader() {
+ return consensusImpl.getLeader(consensusGroupId);
}
// TODO: Interfaces for LoadBalancer control
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
new file mode 100644
index 0000000000..a963ffe476
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.client;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.Endpoint;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.commons.utils.CommonUtils;
+import org.apache.iotdb.confignode.rpc.thrift.*;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Random;
+
+public class ConfigNodeClient {
+ private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
+
+ private static final int TIMEOUT_MS = 2000;
+
+ private static final int RETRY_NUM = 5;
+
+ public static final String MSG_RECONNECTION_FAIL =
+ "Fail to connect to any config node. Please check server it";
+
+ private ConfigIService.Iface client;
+
+ private TTransport transport;
+
+ private Endpoint configLeader;
+
+ private List<Endpoint> configNodes;
+
+ public ConfigNodeClient() throws BadNodeUrlException, IoTDBConnectionException {
+ // Read config nodes from configuration
+ configNodes =
+ CommonUtils.parseNodeUrls(IoTDBDescriptor.getInstance().getConfig().getConfigNodeUrls());
+ init();
+ }
+
+ public ConfigNodeClient(List<Endpoint> configNodes) throws IoTDBConnectionException {
+ this.configNodes = configNodes;
+ init();
+ }
+
+ public ConfigNodeClient(List<Endpoint> configNodes, Endpoint configLeader)
+ throws IoTDBConnectionException {
+ this.configNodes = configNodes;
+ this.configLeader = configLeader;
+ init();
+ }
+
+ public void init() throws IoTDBConnectionException {
+ reconnect();
+ }
+
+ public void connect(Endpoint endpoint) throws IoTDBConnectionException {
+ try {
+ transport =
+ RpcTransportFactory.INSTANCE.getTransport(
+ // as there is a try-catch already, we do not need to use TSocket.wrap
+ endpoint.getIp(), endpoint.getPort(), TIMEOUT_MS);
+ transport.open();
+ } catch (TTransportException e) {
+ throw new IoTDBConnectionException(e);
+ }
+
+ if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
+ client = new ConfigIService.Client(new TCompactProtocol(transport));
+ } else {
+ client = new ConfigIService.Client(new TBinaryProtocol(transport));
+ }
+ }
+
+ private void reconnect() throws IoTDBConnectionException {
+ if (configLeader != null) {
+ try {
+ connect(configLeader);
+ return;
+ } catch (IoTDBConnectionException e) {
+ logger.warn("The current node may have been down {},try next node", configLeader);
+ configLeader = null;
+ }
+ }
+
+ Random random = new Random();
+ if (transport != null) {
+ transport.close();
+ }
+ int currHostIndex = random.nextInt(configNodes.size());
+ int tryHostNum = 0;
+ for (int j = currHostIndex; j < configNodes.size(); j++) {
+ if (tryHostNum == configNodes.size()) {
+ break;
+ }
+ Endpoint tryEndpoint = configNodes.get(j);
+ if (j == configNodes.size() - 1) {
+ j = -1;
+ }
+ tryHostNum++;
+ try {
+ connect(tryEndpoint);
+ return;
+ } catch (IoTDBConnectionException e) {
+ logger.warn("The current node may have been down {},try next node", tryEndpoint);
+ }
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public void close() {
+ transport.close();
+ }
+
+ private boolean updateConfigNodeLeader(TSStatus status) {
+ if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
+ if (status.isSetRedirectNode()) {
+ configLeader =
+ new Endpoint(status.getRedirectNode().getIp(), status.getRedirectNode().getPort());
+ } else {
+ configLeader = null;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public TDataNodeRegisterResp registerDataNode(TDataNodeRegisterReq req)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TDataNodeRegisterResp resp = client.registerDataNode(req);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TDataNodeMessageResp getDataNodesMessage(int dataNodeID) throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TDataNodeMessageResp resp = client.getDataNodesMessage(dataNodeID);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TSStatus setStorageGroup(TSetStorageGroupReq req) throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.setStorageGroup(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TSStatus deleteStorageGroup(TDeleteStorageGroupReq req) throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.deleteStorageGroup(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TStorageGroupSchemaResp getStorageGroupsSchema() throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TStorageGroupSchemaResp resp = client.getStorageGroupsSchema();
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TSchemaPartitionResp getSchemaPartition(TSchemaPartitionReq req)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSchemaPartitionResp resp = client.getSchemaPartition(req);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TSchemaPartitionResp getOrCreateSchemaPartition(TSchemaPartitionReq req)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSchemaPartitionResp resp = client.getOrCreateSchemaPartition(req);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TDataPartitionResp getDataPartition(TDataPartitionReq req)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TDataPartitionResp resp = client.getDataPartition(req);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TDataPartitionResp getOrCreateDataPartition(TDataPartitionReq req)
+ throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TDataPartitionResp resp = client.getOrCreateDataPartition(req);
+ if (!updateConfigNodeLeader(resp.status)) {
+ return resp;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+
+ public TSStatus operatePermission(TAuthorizerReq req) throws IoTDBConnectionException {
+ for (int i = 0; i < RETRY_NUM; i++) {
+ try {
+ TSStatus status = client.operatePermission(req);
+ if (!updateConfigNodeLeader(status)) {
+ return status;
+ }
+ } catch (TException e) {
+ configLeader = null;
+ }
+ reconnect();
+ }
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a5f860c194..a7603576a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -865,6 +865,17 @@ public class IoTDBConfig {
*/
private String consensusProtocolClass = "org.apache.iotdb.consensus.ratis.RatisConsensus";
+ /**
+ * The series partition executor class. The Datanode should communicate with ConfigNode on startup
+ * and set this variable so that the correct class name can be obtained later when calculating the
+ * series partition
+ */
+ private String seriesPartitionExecutorClass =
+ "org.apache.iotdb.commons.partition.executor.hash.APHashExecutor";
+
+ /** The number of series partitions in a storage group */
+ private int seriesPartitionSlotNum = 10000;
+
/** Port that data block manager thrift service listen to. */
private int dataBlockManagerPort = 7777;
@@ -2741,6 +2752,22 @@ public class IoTDBConfig {
this.consensusProtocolClass = consensusProtocolClass;
}
+ public String getSeriesPartitionExecutorClass() {
+ return seriesPartitionExecutorClass;
+ }
+
+ public void setSeriesPartitionExecutorClass(String seriesPartitionExecutorClass) {
+ this.seriesPartitionExecutorClass = seriesPartitionExecutorClass;
+ }
+
+ public int getSeriesPartitionSlotNum() {
+ return seriesPartitionSlotNum;
+ }
+
+ public void setSeriesPartitionSlotNum(int seriesPartitionSlotNum) {
+ this.seriesPartitionSlotNum = seriesPartitionSlotNum;
+ }
+
public int getMppPort() {
return mppPort;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index b34464173f..f8597bfb9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.conf;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.constant.CompactionPriority;
@@ -1618,6 +1619,13 @@ public class IoTDBDescriptor {
return urlList;
}
+ // These configurations are received from config node when registering
+ public void loadGlobalConfig(TGlobalConfig globalConfig) {
+ conf.setSeriesPartitionExecutorClass(globalConfig.getSeriesPartitionExecutorClass());
+ conf.setConsensusProtocolClass(globalConfig.getDataNodeConsensusProtocolClass());
+ conf.setSeriesPartitionSlotNum(globalConfig.getSeriesPartitionSlotNum());
+ }
+
private static class IoTDBDescriptorHolder {
private static final IoTDBDescriptor INSTANCE = new IoTDBDescriptor();
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 5d022f9692..a14fe6feaa 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -26,29 +26,20 @@ import org.apache.iotdb.commons.exception.ConfigurationException;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.JMXService;
import org.apache.iotdb.commons.service.RegisterManager;
-import org.apache.iotdb.commons.utils.CommonUtils;
-import org.apache.iotdb.confignode.rpc.thrift.ConfigIService;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRegisterResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConfigCheck;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.service.thrift.impl.DataNodeManagementServiceImpl;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.List;
-import java.util.Random;
public class DataNode implements DataNodeMBean {
private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
@@ -121,45 +112,44 @@ public class DataNode implements DataNodeMBean {
}
public void joinCluster() throws StartupException {
- List<Endpoint> configNodes;
+ int retry = DEFAULT_JOIN_RETRY;
+ ConfigNodeClient configNodeClient = null;
try {
- configNodes =
- CommonUtils.parseNodeUrls(IoTDBDescriptor.getInstance().getConfig().getConfigNodeUrls());
- } catch (BadNodeUrlException e) {
+ configNodeClient = new ConfigNodeClient();
+ } catch (IoTDBConnectionException | BadNodeUrlException e) {
throw new StartupException(e.getMessage());
}
- int retry = DEFAULT_JOIN_RETRY;
while (retry > 0) {
- // randomly pick up a config node to try
- Random random = new Random();
- Endpoint configNode = configNodes.get(random.nextInt(configNodes.size()));
- logger.info("start joining the cluster with the help of {}", configNode);
+ logger.info("start joining the cluster.");
try {
- ConfigIService.Client client = createClient(configNode);
TDataNodeRegisterResp dataNodeRegisterResp =
- client.registerDataNode(
+ configNodeClient.registerDataNode(
new TDataNodeRegisterReq(new EndPoint(thisNode.getIp(), thisNode.getPort())));
if (dataNodeRegisterResp.getStatus().getCode()
- == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || dataNodeRegisterResp.getStatus().getCode()
+ == TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode()) {
dataNodeID = dataNodeRegisterResp.getDataNodeID();
- logger.info("Joined a cluster successfully");
+ IoTDBDescriptor.getInstance().loadGlobalConfig(dataNodeRegisterResp.globalConfig);
+ logger.info("Joined the cluster successfully");
return;
}
+
// wait 5s to start the next try
Thread.sleep(IoTDBDescriptor.getInstance().getConfig().getJoinClusterTimeOutMs());
- } catch (TException | IoTDBConnectionException e) {
- logger.warn("Cannot join the cluster from {}, because:", configNode, e);
+ } catch (IoTDBConnectionException e) {
+ logger.warn("Cannot join the cluster, because: {}", e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.warn("Unexpected interruption when waiting to join a cluster", e);
+ logger.warn("Unexpected interruption when waiting to join the cluster", e);
}
// start the next try
retry--;
}
// all tries failed
logger.error("Cannot join the cluster after {} retries", DEFAULT_JOIN_RETRY);
- throw new StartupException("Can not connect with the config nodes, please check the network");
+ throw new StartupException("Cannot join the cluster.");
}
public void active() throws StartupException {
@@ -198,25 +188,4 @@ public class DataNode implements DataNodeMBean {
private DataNodeHolder() {}
}
-
- private ConfigIService.Client createClient(Endpoint endpoint) throws IoTDBConnectionException {
- TTransport transport;
- try {
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- // as there is a try-catch already, we do not need to use TSocket.wrap
- endpoint.getIp(), endpoint.getPort(), 2000);
- transport.open();
- } catch (TTransportException e) {
- throw new IoTDBConnectionException(e);
- }
-
- ConfigIService.Client client;
- if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) {
- client = new ConfigIService.Client(new TCompactProtocol(transport));
- } else {
- client = new ConfigIService.Client(new TBinaryProtocol(transport));
- }
- return client;
- }
}