You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/04/21 11:00:24 UTC
[accumulo] branch main updated: Fix NullTServer and ZombieTServer thrift processor setup (#2645)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 933ce3a32a Fix NullTServer and ZombieTServer thrift processor setup (#2645)
933ce3a32a is described below
commit 933ce3a32ab170101ad7c1dee3fc611d81da60f6
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Apr 21 07:00:18 2022 -0400
Fix NullTServer and ZombieTServer thrift processor setup (#2645)
---
.../accumulo/server/rpc/ThriftProcessorTypes.java | 14 +++++++----
.../accumulo/test/functional/ZombieTServer.java | 26 +++++++++++++++++----
.../accumulo/test/performance/NullTserver.java | 27 ++++++++++++++++------
3 files changed, 51 insertions(+), 16 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
index caf044f4d9..e8ef43ea62 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
@@ -39,16 +39,20 @@ import org.apache.thrift.TProcessor;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.TServiceClientFactory;
+import com.google.common.annotations.VisibleForTesting;
+
public class ThriftProcessorTypes {
- private static class ProcessorType<C extends TServiceClient,F extends TServiceClientFactory<C>>
+ @VisibleForTesting
+ public static class ProcessorType<C extends TServiceClient,F extends TServiceClientFactory<C>>
extends ThriftClientType<C,F> {
public ProcessorType(ThriftClientType<C,F> type) {
super(type.getServiceName(), type.getClientFactory());
}
- private <I,H extends I,P extends TBaseProcessor<?>> TProcessor getTProcessor(
+ @VisibleForTesting
+ public <I,H extends I,P extends TBaseProcessor<?>> TProcessor getTProcessor(
Class<P> processorClass, Class<I> interfaceClass, H serviceHandler, ServerContext context,
AccumuloConfiguration conf) {
I rpcProxy = TraceUtil.wrapService(serviceHandler);
@@ -65,7 +69,8 @@ public class ThriftProcessorTypes {
}
}
- private static final ProcessorType<ClientService.Client,ClientService.Client.Factory> CLIENT =
+ @VisibleForTesting
+ public static final ProcessorType<ClientService.Client,ClientService.Client.Factory> CLIENT =
new ProcessorType<>(ThriftClientTypes.CLIENT);
private static final ProcessorType<CompactorService.Client,
@@ -92,7 +97,8 @@ public class ThriftProcessorTypes {
ReplicationServicer.Client.Factory> REPLICATION_SERVICER =
new ProcessorType<>(ThriftClientTypes.REPLICATION_SERVICER);
- private static final ProcessorType<TabletClientService.Client,
+ @VisibleForTesting
+ public static final ProcessorType<TabletClientService.Client,
TabletClientService.Client.Factory> TABLET_SERVER =
new ProcessorType<>(ThriftClientTypes.TABLET_SERVER);
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index b02761e54f..1b354385d0 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -27,11 +27,12 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
@@ -44,9 +45,13 @@ import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.rpc.ServerAddress;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.thrift.TMultiplexedProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +64,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class ZombieTServer {
public static class ZombieTServerThriftClientHandler
- extends org.apache.accumulo.test.performance.NullTserver.NullTServerThriftClientHandler {
+ extends org.apache.accumulo.test.performance.NullTserver.NullTServerTabletClientHandler
+ implements TabletClientService.Iface {
int statusCount = 0;
@@ -97,11 +103,21 @@ public class ZombieTServer {
public static void main(String[] args) throws Exception {
int port = random.nextInt(30000) + 2000;
var context = new ServerContext(SiteConfiguration.auto());
+ final ClientServiceHandler csh =
+ new ClientServiceHandler(context, new TransactionWatcher(context));
final ZombieTServerThriftClientHandler tch = new ZombieTServerThriftClientHandler();
- Processor<Iface> processor = new Processor<>(tch);
+
+ TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+ muxProcessor.registerProcessor(ThriftClientTypes.CLIENT.getServiceName(),
+ ThriftProcessorTypes.CLIENT.getTProcessor(ClientService.Processor.class,
+ ClientService.Iface.class, csh, context, context.getConfiguration()));
+ muxProcessor.registerProcessor(ThriftClientTypes.TABLET_SERVER.getServiceName(),
+ ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletClientService.Processor.class,
+ TabletClientService.Iface.class, tch, context, context.getConfiguration()));
+
ServerAddress serverPort =
TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA,
- processor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
+ muxProcessor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
1000, 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port));
String addressString = serverPort.address.toString();
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index ac754e63d2..9094c470ab 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -58,6 +59,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletLocationState;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.rpc.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
@@ -67,19 +69,21 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
import org.apache.accumulo.server.manager.state.Assignment;
import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
import org.apache.accumulo.server.manager.state.TabletStateStore;
import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
import org.apache.thrift.TException;
+import org.apache.thrift.TMultiplexedProcessor;
import com.beust.jcommander.Parameter;
@@ -90,7 +94,7 @@ import com.beust.jcommander.Parameter;
*/
public class NullTserver {
- public static class NullTServerThriftClientHandler implements TabletClientService.Iface {
+ public static class NullTServerTabletClientHandler implements TabletClientService.Iface {
private long updateSession = 1;
@@ -317,10 +321,19 @@ public class NullTserver {
(int) DefaultConfiguration.getInstance().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
var siteConfig = SiteConfiguration.auto();
ServerContext context = ServerContext.override(siteConfig, opts.iname, opts.keepers, zkTimeOut);
- NullTServerThriftClientHandler tch = new NullTServerThriftClientHandler();
- Processor<Iface> processor = new Processor<>(tch);
- TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, processor,
- "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
+ ClientServiceHandler csh = new ClientServiceHandler(context, new TransactionWatcher(context));
+ NullTServerTabletClientHandler tch = new NullTServerTabletClientHandler();
+
+ TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+ muxProcessor.registerProcessor(ThriftClientTypes.CLIENT.getServiceName(),
+ ThriftProcessorTypes.CLIENT.getTProcessor(ClientService.Processor.class,
+ ClientService.Iface.class, csh, context, context.getConfiguration()));
+ muxProcessor.registerProcessor(ThriftClientTypes.TABLET_SERVER.getServiceName(),
+ ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletClientService.Processor.class,
+ TabletClientService.Iface.class, tch, context, context.getConfiguration()));
+
+ TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA,
+ muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", opts.port));
HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);