You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/04/21 11:00:24 UTC

[accumulo] branch main updated: Fix NullTServer and ZombieTServer thrift processor setup (#2645)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 933ce3a32a Fix NullTServer and ZombieTServer thrift processor setup (#2645)
933ce3a32a is described below

commit 933ce3a32ab170101ad7c1dee3fc611d81da60f6
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Apr 21 07:00:18 2022 -0400

    Fix NullTServer and ZombieTServer thrift processor setup (#2645)
---
 .../accumulo/server/rpc/ThriftProcessorTypes.java  | 14 +++++++----
 .../accumulo/test/functional/ZombieTServer.java    | 26 +++++++++++++++++----
 .../accumulo/test/performance/NullTserver.java     | 27 ++++++++++++++++------
 3 files changed, 51 insertions(+), 16 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
index caf044f4d9..e8ef43ea62 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftProcessorTypes.java
@@ -39,16 +39,20 @@ import org.apache.thrift.TProcessor;
 import org.apache.thrift.TServiceClient;
 import org.apache.thrift.TServiceClientFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 public class ThriftProcessorTypes {
 
-  private static class ProcessorType<C extends TServiceClient,F extends TServiceClientFactory<C>>
+  @VisibleForTesting
+  public static class ProcessorType<C extends TServiceClient,F extends TServiceClientFactory<C>>
       extends ThriftClientType<C,F> {
 
     public ProcessorType(ThriftClientType<C,F> type) {
       super(type.getServiceName(), type.getClientFactory());
     }
 
-    private <I,H extends I,P extends TBaseProcessor<?>> TProcessor getTProcessor(
+    @VisibleForTesting
+    public <I,H extends I,P extends TBaseProcessor<?>> TProcessor getTProcessor(
         Class<P> processorClass, Class<I> interfaceClass, H serviceHandler, ServerContext context,
         AccumuloConfiguration conf) {
       I rpcProxy = TraceUtil.wrapService(serviceHandler);
@@ -65,7 +69,8 @@ public class ThriftProcessorTypes {
     }
   }
 
-  private static final ProcessorType<ClientService.Client,ClientService.Client.Factory> CLIENT =
+  @VisibleForTesting
+  public static final ProcessorType<ClientService.Client,ClientService.Client.Factory> CLIENT =
       new ProcessorType<>(ThriftClientTypes.CLIENT);
 
   private static final ProcessorType<CompactorService.Client,
@@ -92,7 +97,8 @@ public class ThriftProcessorTypes {
       ReplicationServicer.Client.Factory> REPLICATION_SERVICER =
           new ProcessorType<>(ThriftClientTypes.REPLICATION_SERVICER);
 
-  private static final ProcessorType<TabletClientService.Client,
+  @VisibleForTesting
+  public static final ProcessorType<TabletClientService.Client,
       TabletClientService.Client.Factory> TABLET_SERVER =
           new ProcessorType<>(ThriftClientTypes.TABLET_SERVER);
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index b02761e54f..1b354385d0 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -27,11 +27,12 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
 import org.apache.accumulo.core.conf.SiteConfiguration;
 import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.rpc.ThriftClientTypes;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
 import org.apache.accumulo.core.trace.TraceUtil;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.HostAndPort;
@@ -44,9 +45,13 @@ import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.rpc.ServerAddress;
 import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
+import org.apache.thrift.TMultiplexedProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +64,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 public class ZombieTServer {
 
   public static class ZombieTServerThriftClientHandler
-      extends org.apache.accumulo.test.performance.NullTserver.NullTServerThriftClientHandler {
+      extends org.apache.accumulo.test.performance.NullTserver.NullTServerTabletClientHandler
+      implements TabletClientService.Iface {
 
     int statusCount = 0;
 
@@ -97,11 +103,21 @@ public class ZombieTServer {
   public static void main(String[] args) throws Exception {
     int port = random.nextInt(30000) + 2000;
     var context = new ServerContext(SiteConfiguration.auto());
+    final ClientServiceHandler csh =
+        new ClientServiceHandler(context, new TransactionWatcher(context));
     final ZombieTServerThriftClientHandler tch = new ZombieTServerThriftClientHandler();
-    Processor<Iface> processor = new Processor<>(tch);
+
+    TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+    muxProcessor.registerProcessor(ThriftClientTypes.CLIENT.getServiceName(),
+        ThriftProcessorTypes.CLIENT.getTProcessor(ClientService.Processor.class,
+            ClientService.Iface.class, csh, context, context.getConfiguration()));
+    muxProcessor.registerProcessor(ThriftClientTypes.TABLET_SERVER.getServiceName(),
+        ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletClientService.Processor.class,
+            TabletClientService.Iface.class, tch, context, context.getConfiguration()));
+
     ServerAddress serverPort =
         TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA,
-            processor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
+            muxProcessor, "ZombieTServer", "walking dead", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS,
             1000, 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port));
 
     String addressString = serverPort.address.toString();
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index ac754e63d2..9094c470ab 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.cli.Help;
+import org.apache.accumulo.core.clientImpl.thrift.ClientService;
 import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
 import org.apache.accumulo.core.conf.DefaultConfiguration;
 import org.apache.accumulo.core.conf.Property;
@@ -58,6 +59,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.TServerInstance;
 import org.apache.accumulo.core.metadata.TabletLocationState;
 import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.rpc.ThriftClientTypes;
 import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
 import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
@@ -67,19 +69,21 @@ import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
 import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
 import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
-import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Processor;
 import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.client.ClientServiceHandler;
 import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.MetaDataTableScanner;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
 import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
 import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.zookeeper.TransactionWatcher;
 import org.apache.thrift.TException;
+import org.apache.thrift.TMultiplexedProcessor;
 
 import com.beust.jcommander.Parameter;
 
@@ -90,7 +94,7 @@ import com.beust.jcommander.Parameter;
  */
 public class NullTserver {
 
-  public static class NullTServerThriftClientHandler implements TabletClientService.Iface {
+  public static class NullTServerTabletClientHandler implements TabletClientService.Iface {
 
     private long updateSession = 1;
 
@@ -317,10 +321,19 @@ public class NullTserver {
         (int) DefaultConfiguration.getInstance().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
     var siteConfig = SiteConfiguration.auto();
     ServerContext context = ServerContext.override(siteConfig, opts.iname, opts.keepers, zkTimeOut);
-    NullTServerThriftClientHandler tch = new NullTServerThriftClientHandler();
-    Processor<Iface> processor = new Processor<>(tch);
-    TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, processor,
-        "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
+    ClientServiceHandler csh = new ClientServiceHandler(context, new TransactionWatcher(context));
+    NullTServerTabletClientHandler tch = new NullTServerTabletClientHandler();
+
+    TMultiplexedProcessor muxProcessor = new TMultiplexedProcessor();
+    muxProcessor.registerProcessor(ThriftClientTypes.CLIENT.getServiceName(),
+        ThriftProcessorTypes.CLIENT.getTProcessor(ClientService.Processor.class,
+            ClientService.Iface.class, csh, context, context.getConfiguration()));
+    muxProcessor.registerProcessor(ThriftClientTypes.TABLET_SERVER.getServiceName(),
+        ThriftProcessorTypes.TABLET_SERVER.getTProcessor(TabletClientService.Processor.class,
+            TabletClientService.Iface.class, tch, context, context.getConfiguration()));
+
+    TServerUtils.startTServer(context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA,
+        muxProcessor, "NullTServer", "null tserver", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000,
         10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", opts.port));
 
     HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);