You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/03/01 19:39:37 UTC

[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r817083714



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.ScanServerDiscovery;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanServer extends TabletServer implements TabletClientService.Iface {
+
+  /**
+   * A compaction manager that does nothing
+   */
+  private static class ScanServerCompactionManager extends CompactionManager {
+
+    public ScanServerCompactionManager(ServerContext context,
+        CompactionExecutorsMetrics ceMetrics) {
+      super(new ArrayList<>(), context, ceMetrics);
+    }
+
+    @Override
+    public void compactableChanged(Compactable compactable) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public CompactionServices getServices() {
+      return null;
+    }
+
+    @Override
+    public boolean isCompactionQueued(KeyExtent extent, Set<CompactionServiceId> servicesUsed) {
+      return false;
+    }
+
+    @Override
+    public int getCompactionsRunning() {
+      return 0;
+    }
+
+    @Override
+    public int getCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
+        String compactorId, ExternalCompactionId externalCompactionId) {
+      return null;
+    }
+
+    @Override
+    public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent extent,
+        CompactionExecutorId ceid) {}
+
+    @Override
+    public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+        KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize,
+        long entries) {}
+
+    @Override
+    public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted,
+        Map<KeyExtent,Tablet> currentTablets) {}
+
+    @Override
+    public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+      return null;
+    }
+
+    @Override
+    public Collection<ExtCompMetric> getExternalMetrics() {
+      return null;
+    }
+
+    @Override
+    public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> servicesUsed,
+        Set<ExternalCompactionId> ecids) {}
+
+  }
+
+  protected static class CurrentScan {
+    protected KeyExtent extent;
+    protected Tablet tablet;
+    protected Long scanId;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftClientHandler handler;
+  protected CurrentScan currentScan = new CurrentScan();
+
+  public ScanServer(ServerOpts opts, String[] args) {
+    super(opts, args, true);
+    handler = getHandler();
+  }
+
+  protected ThriftClientHandler getHandler() {
+    return new ThriftClientHandler(this);
+  }
+
+  private void cleanupTimedOutSession() {
+    synchronized (currentScan) {
+      if (currentScan.scanId != null && !sessionManager.exists(currentScan.scanId)) {
+        LOG.info("{} is no longer active, ending scan", currentScan.scanId);
+        endScan();
+      }
+    }
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws UnknownHostException {
+    Iface rpcProxy = TraceUtil.wrapService(this);
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
+    }
+    final TabletClientService.Processor<Iface> processor =
+        new TabletClientService.Processor<>(rpcProxy);
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString());
+
+      try {
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is consistent.");
+        }
+        throw e;
+      }
+
+      tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID());
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e));
+        }
+      };
+
+      byte[] lockContent = new ServerServices(getClientAddressString(), Service.SSERV_CLIENT)
+          .toString().getBytes(UTF_8);
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
+
+        if (tabletServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", tabletServerLock.getLockPath());
+          return tabletServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+
+    ServiceLock lock = announceExistence();
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
+    }
+    scanMetrics = new TabletServerScanMetrics();
+    MetricsUtil.initializeProducers(scanMetrics);
+
+    // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
+    ceMetrics = new CompactionExecutorsMetrics();
+    this.compactionManager = new ScanServerCompactionManager(getContext(), ceMetrics);
+
+    // SessionManager times out sessions when sessions have been idle longer than
+    // TSERV_SESSION_MAXIDLE. Create a background thread that looks for sessions that
+    // have timed out and end the scan.
+    long maxIdle =
+        this.getContext().getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+    LOG.debug("Looking for timed out scan sessions every {}ms", Math.max(maxIdle / 2, 1000));
+    this.getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> cleanupTimedOutSession(),
+        Math.max(maxIdle / 2, 1000), Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS);
+
+    try {
+      try {
+        ScanServerDiscovery.unreserve(this.getContext().getZooKeeperRoot(),
+            getContext().getZooReaderWriter(), getClientAddressString());
+      } catch (Exception e2) {
+        throw new RuntimeException("Error setting initial unreserved state in ZooKeeper", e2);
+      }
+
+      while (!serverStopRequested) {
+        UtilWaitThread.sleep(1000);
+      }
+    } finally {
+      LOG.info("Stopping Thrift Servers");
+      TServerUtils.stopTServer(address.server);
+
+      try {
+        LOG.debug("Closing filesystems");
+        VolumeManager mgr = getContext().getVolumeManager();
+        if (null != mgr) {
+          mgr.close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+      }
+
+      gcLogger.logGCInfo(getConfiguration());
+      LOG.info("stop requested. exiting ... ");
+      try {
+        if (null != lock) {
+          lock.unlock();
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to release scan server lock", e);
+      }
+
+    }
+  }
+
+  private synchronized void checkInUse() {
+    synchronized (currentScan) {
+      if (currentScan.extent != null || currentScan.tablet != null) {
+        throw new RuntimeException("Scan server in use for another query");
+      }
+    }
+  }
+
+  protected synchronized boolean loadTablet(TKeyExtent textent)
+      throws IllegalArgumentException, IOException, AccumuloException {
+    synchronized (currentScan) {
+      KeyExtent extent = KeyExtent.fromThrift(textent);
+      TabletMetadata tabletMetadata = getContext().getAmple().readTablet(extent);

Review comment:
       This has been added, see `tabletMetadataCache` in ScanServer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org