You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/01/19 20:32:00 UTC

[GitHub] [accumulo] dlmarion opened a new pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

dlmarion opened a new pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422


   On the client side, ScannerOptions.useScanServer has been added. This option is checked in ThriftScanner and TabletServerBatchReaderIterator. If the option is set, then a ScanServerLocator implementation is retrieved from the ClientContext. The default, DefaultScanServerLocator is used, if a class name is not supplied by the user for a different implementation. The ThriftScanner and TabletServerBatchReaderIterator use the ScanServerLocator to find a ScanServer to use for scanning a Tablet. Once the ScanServer address has been resolved, the existing scan code is used to create a Thrift connection using the TabletClientService. 
   
   On the server side, the ScanServer class extends TabletServer and provides its own implementation of the TabletClientService Thrift interface. The only methods that are implemented are startScan, continueScan, closeScan, startMultiScan (todo), continueMultiScan (todo), and closeMultiScan (todo). The ScanServer performs scans over one Tablet for a single client. Attempts to use a ScanServer that is in use by another client return an error on startScan.
   
   The DefaultScanServerLocator has a rudimentary reservation system that likely needs a lot of work, and possibly, a different implementation.
   
   Related to #2411 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791015464



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       As currently implemented a ScanServer performs one scan at a time, much like the Compactor performs one compaction at a time. I figured that if a ScanServer had many threads and performed more than one scan at a time, then we would potentially run into the same situation we have in in the TabletServer w/r/t memory usage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r792053837



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.ScanServerDiscovery;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanServer extends TabletServer implements TabletClientService.Iface {
+
+  /**
+   * A compaction manager that does nothing
+   */
+  private static class ScanServerCompactionManager extends CompactionManager {
+
+    public ScanServerCompactionManager(ServerContext context,
+        CompactionExecutorsMetrics ceMetrics) {
+      super(new ArrayList<>(), context, ceMetrics);
+    }
+
+    @Override
+    public void compactableChanged(Compactable compactable) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public CompactionServices getServices() {
+      return null;
+    }
+
+    @Override
+    public boolean isCompactionQueued(KeyExtent extent, Set<CompactionServiceId> servicesUsed) {
+      return false;
+    }
+
+    @Override
+    public int getCompactionsRunning() {
+      return 0;
+    }
+
+    @Override
+    public int getCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
+        String compactorId, ExternalCompactionId externalCompactionId) {
+      return null;
+    }
+
+    @Override
+    public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent extent,
+        CompactionExecutorId ceid) {}
+
+    @Override
+    public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+        KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize,
+        long entries) {}
+
+    @Override
+    public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted,
+        Map<KeyExtent,Tablet> currentTablets) {}
+
+    @Override
+    public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+      return null;
+    }
+
+    @Override
+    public Collection<ExtCompMetric> getExternalMetrics() {
+      return null;
+    }
+
+    @Override
+    public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> servicesUsed,
+        Set<ExternalCompactionId> ecids) {}
+
+  }
+
+  protected static class CurrentScan {
+    protected KeyExtent extent;
+    protected Tablet tablet;
+    protected Long scanId;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftClientHandler handler;
+  protected CurrentScan currentScan = new CurrentScan();
+
+  public ScanServer(ServerOpts opts, String[] args) {
+    super(opts, args, true);
+    handler = getHandler();
+  }
+
+  protected ThriftClientHandler getHandler() {
+    return new ThriftClientHandler(this);
+  }
+
+  private void cleanupTimedOutSession() {
+    synchronized (currentScan) {
+      if (currentScan.scanId != null && !sessionManager.exists(currentScan.scanId)) {
+        LOG.info("{} is no longer active, ending scan", currentScan.scanId);
+        endScan();
+      }
+    }
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws UnknownHostException {
+    Iface rpcProxy = TraceUtil.wrapService(this);
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
+    }
+    final TabletClientService.Processor<Iface> processor =
+        new TabletClientService.Processor<>(rpcProxy);
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString());
+
+      try {
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is consistent.");
+        }
+        throw e;
+      }
+
+      tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID());
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e));
+        }
+      };
+
+      byte[] lockContent = new ServerServices(getClientAddressString(), Service.SSERV_CLIENT)
+          .toString().getBytes(UTF_8);
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
+
+        if (tabletServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", tabletServerLock.getLockPath());
+          return tabletServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+
+    ServiceLock lock = announceExistence();
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
+    }
+    scanMetrics = new TabletServerScanMetrics();
+    MetricsUtil.initializeProducers(scanMetrics);
+
+    // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
+    ceMetrics = new CompactionExecutorsMetrics();
+    this.compactionManager = new ScanServerCompactionManager(getContext(), ceMetrics);
+
+    // SessionManager times out sessions when sessions have been idle longer than
+    // TSERV_SESSION_MAXIDLE. Create a background thread that looks for sessions that
+    // have timed out and end the scan.
+    long maxIdle =
+        this.getContext().getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+    LOG.debug("Looking for timed out scan sessions every {}ms", Math.max(maxIdle / 2, 1000));
+    this.getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> cleanupTimedOutSession(),
+        Math.max(maxIdle / 2, 1000), Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS);
+
+    try {
+      try {
+        ScanServerDiscovery.unreserve(this.getContext().getZooKeeperRoot(),
+            getContext().getZooReaderWriter(), getClientAddressString());
+      } catch (Exception e2) {
+        throw new RuntimeException("Error setting initial unreserved state in ZooKeeper", e2);
+      }
+
+      while (!serverStopRequested) {
+        UtilWaitThread.sleep(1000);
+      }
+    } finally {
+      LOG.info("Stopping Thrift Servers");
+      TServerUtils.stopTServer(address.server);
+
+      try {
+        LOG.debug("Closing filesystems");
+        VolumeManager mgr = getContext().getVolumeManager();
+        if (null != mgr) {
+          mgr.close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+      }
+
+      gcLogger.logGCInfo(getConfiguration());
+      LOG.info("stop requested. exiting ... ");
+      try {
+        if (null != lock) {
+          lock.unlock();
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to release scan server lock", e);
+      }
+
+    }
+  }
+
+  private synchronized void checkInUse() {
+    synchronized (currentScan) {
+      if (currentScan.extent != null || currentScan.tablet != null) {
+        throw new RuntimeException("Scan server in use for another query");
+      }
+    }
+  }
+
+  protected synchronized boolean loadTablet(TKeyExtent textent)
+      throws IllegalArgumentException, IOException, AccumuloException {
+    synchronized (currentScan) {
+      KeyExtent extent = KeyExtent.fromThrift(textent);
+      TabletMetadata tabletMetadata = getContext().getAmple().readTablet(extent);

Review comment:
       @dlmarion when we talked earlier today we discussed that the Tablet or TabletMetadata could be cached for a limited time.  Something for one of us to look into  eventually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840537393



##########
File path: server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
##########
@@ -176,6 +178,25 @@ public void execute(String[] args) throws Exception {
 
       }
 
+      if (opts.zapScanServers) {
+        String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
+        try {
+          List<String> children = zoo.getChildren(sserversPath);
+          for (String child : children) {
+            message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts);
+
+            var zLockPath = ServiceLock.path(sserversPath + "/" + child);
+            if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
+              if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {

Review comment:
       No, this was a copy paste error. I'm going to modify it to call a different `ServiceLock.deleteLock` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r838500436



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
##########
@@ -127,6 +161,9 @@ public T get(long timeout, TimeUnit unit)
     // returned
     resultQueue = null;
 
+    // TODO by not wrapping the error stack information that could be important for debugging is
+    // lost, the error is from a background thread but the stack trace from this foreground thread
+    // is lost.

Review comment:
       I think so.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1017869373


   @keith-turner - I don't have a schedule. I figured I would flesh out my idea for an implementation and see if it works. There is not a ton of code with this approach as I am reusing the existing scan code in the client and tablet server. You had commented on some load balancing strategies, which is certainly useful, but I think can be pushed down the road or to the user being able to plug in their own strategy into the client (which I what I have in this PR).
   
   My current plan is to write some tests and test locally to see how many issues pop up. It's possible that this approach is total flop and something else is needed. If this happens to be done before 2.1.0, then great. If not, then 2.2.0 is fine too. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1024522042


   Some annotated shell output:
   ```bash
   root@test> createtable test (1)
   root@test test> insert a b c d (2)
   root@test test> scan (3)
   a b:c []	d
   root@test test> scan -cl immediate (4)
   a b:c []	d
   root@test test> scan -cl eventual (5)
   root@test test> flush (6)
   2022-01-28T18:58:10,693 [shell.Shell] INFO : Flush of table test  initiated...
   root@test test> scan (7)
   a b:c []	d
   root@test test> scan -cl eventual (8)
   a b:c []	d
   ```
   
   I create a table (1) and insert some date (2). When I run a scan (3,4) with the `immediate` consistency level, which happens to be the default, the scan uses normal code path and issues the scan command against the tablet server. Data is returned because the tablet server code path also returns data that is in the in-memory map. When I scan with the `eventual` consistency level no data is returned because the code path in the scanner only uses the data in the tablet's files. When I flush (6) the data to write a file in HDFS, the subsequent scans with `immediate` (7) and `eventual` (8) consistency level return the data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r837502657



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
##########
@@ -353,36 +367,38 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t
   }
 
   @Override
-  public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException,
-      NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-      TSampleNotPresentException {
+  public ScanResult continueScan(TInfo tinfo, long scanID, long busyTimeout)
+      throws NoSuchScanIDException, NotServingTabletException,
+      org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+      TSampleNotPresentException, ScanServerBusyException {
     SingleScanSession scanSession =
         (SingleScanSession) server.sessionManager.reserveSession(scanID);
     if (scanSession == null) {
       throw new NoSuchScanIDException();
     }
 
     try {
-      return continueScan(tinfo, scanID, scanSession);
+      return continueScan(tinfo, scanID, scanSession, busyTimeout);
     } finally {
       server.sessionManager.unreserveSession(scanSession);
     }
   }
 
-  private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
-      throws NoSuchScanIDException, NotServingTabletException,
+  protected ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession,
+      long busyTimeout) throws NoSuchScanIDException, NotServingTabletException,
       org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-      TSampleNotPresentException {
+      TSampleNotPresentException, ScanServerBusyException {
 
     if (scanSession.nextBatchTask == null) {
+      // TODO look into prioritizing continue scan task in thread pool queue

Review comment:
       Is this another case where we can satisfy this TODO by creating a separate ticket?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r837496595



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
##########
@@ -127,6 +161,9 @@ public T get(long timeout, TimeUnit unit)
     // returned
     resultQueue = null;
 
+    // TODO by not wrapping the error stack information that could be important for debugging is
+    // lost, the error is from a background thread but the stack trace from this foreground thread
+    // is lost.

Review comment:
       I'm not sure that this can happen. In the places where ScanTask.addResult() is called, only Exceptions are added, not Errors.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] EdColeman commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
EdColeman commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r839654152



##########
File path: core/src/test/java/org/apache/accumulo/core/spi/scan/DefaultScanServerDispatcherTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.jupiter.api.Assertions.*;
+

Review comment:
       star import fails checkstyle should be: 
   
   Suggestion 
   
   ```
   import static org.junit.jupiter.api.Assertions.assertEquals;
   import static org.junit.jupiter.api.Assertions.assertThrows;
   import static org.junit.jupiter.api.Assertions.assertTrue;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r837491974



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
##########
@@ -138,6 +175,13 @@ public T get(long timeout, TimeUnit unit)
     return rAsT;
   }
 
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // TODO probably no longer makes sense to extend future

Review comment:
       I think we can satisfy this TODO by creating a separate issue for this change. ScanTask implemented Future<> before this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840559835



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
##########
@@ -56,6 +56,7 @@
   private Map<String,String> configuredSiteConig = new HashMap<>();
   private Map<String,String> clientProps = new HashMap<>();
   private int numTservers = 2;
+  private int numScanServers = 2;

Review comment:
       Fixed in 7521ae6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840530830



##########
File path: test/src/main/resources/log4j2-test.properties
##########
@@ -79,7 +79,7 @@ logger.16.name = org.apache.accumulo.server.util.ReplicationTableUtil
 logger.16.level = trace
 
 logger.17.name = org.apache.accumulo.core.clientImpl.ThriftScanner
-logger.17.level = info
+logger.17.level = debug

Review comment:
       For now, it's helping me debug some tests. In the long run, no.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1021503545


   @keith-turner and I had a discussion about changing the current design. The key points from the discussion are:
   
   1. Modifying the load balancing logic from the current implementation (which uses ZK to know which scan servers are available) to an implemenation that selects  candidate scan servers to communicate with for scanning an extent. This new implementation would not require ZK and would use a hashing algorithm so that all clients select the same set of scan servers for an extent in an attempt to make use of the index and data block caches.
   
   2. We may want to have configuration settings for the scan server for things that are inherited from the tablet server. For example, the data block cache size. Does the scan server use the current configuration property name, or should it have its own.
   
   3. ~~Modifying the scan server to perform more than one concurrent scan.~~ Added in [0d553de]
   
   4. Modify the client configuration code such that the name is not tied to the implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r830197238



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+
+public class ScanAttemptsImpl {
+
+  static class ScanAttemptImpl
+      implements org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt {
+
+    private final String server;
+    private final long time;
+    private final Result result;
+    private volatile long mutationCount = Long.MAX_VALUE;
+
+    ScanAttemptImpl(Result result, String server, long time) {
+      this.result = result;
+      this.server = Objects.requireNonNull(server);
+      this.time = time;
+    }
+
+    @Override
+    public String getServer() {
+      return server;
+    }
+
+    @Override
+    public long getEndTime() {
+      return time;
+    }
+
+    @Override
+    public Result getResult() {
+      return result;
+    }
+
+    private void setMutationCount(long mc) {
+      this.mutationCount = mc;
+    }
+
+    public long getMutationCount() {
+      return mutationCount;
+    }
+
+  }
+
+  private Map<TabletId,Collection<ScanAttemptImpl>> attempts = new ConcurrentHashMap<>();
+  private long mutationCounter = 0;
+
+  private AtomicInteger currentIteration = new AtomicInteger(0);

Review comment:
       variable is not used

##########
File path: test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.clientImpl.ThriftScanner.ScanTimedOutException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ScanServerIT extends SharedMiniClusterBase {
+
+  private static class ScanServerITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+      cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+    }
+  }
+
+  @BeforeClass
+  public static void start() throws Exception {
+    ScanServerITConfiguration c = new ScanServerITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testScan() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        int count = 0;
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : scanner) {
+          count++;
+        }
+        assertEquals(100, count);
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testBatchScan() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRanges(Collections.singletonList(new Range()));
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        int count = 0;
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : scanner) {
+          count++;
+        }
+        assertEquals(100, count);
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  // TODO: This test currently fails, but we could change the client code to make it work.
+  @Test
+  public void testScanOfflineTable() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+      client.tableOperations().offline(tableName, true);
+
+      assertThrows(TableOfflineException.class, () -> {
+        try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+          scanner.setRange(new Range());
+          scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+          int count = 0;
+          for (@SuppressWarnings("unused")
+          Entry<Key,Value> entry : scanner) {
+            count++;
+          }
+          assertEquals(100, count);
+        } // when the scanner is closed, all open sessions should be closed
+      });
+    }
+  }
+
+  @Test
+  public void testScanServerBusy() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner.setRange(new Range());
+      scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      // We only inserted 100 rows, default batch size is 1000. If we don't set the
+      // batch size lower, then the server side code will automatically close the
+      // scanner on the first call to continueScan. We want to keep it open, so lower the batch
+      // size.
+      scanner.setBatchSize(10);
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+      iter.next();
+      iter.next();
+      // At this point the tablet server will time out this scan after TSERV_SESSION_MAXIDLE
+      // Start up another scanner and set it to time out in 1s. It should fail because there
+      // is no scan server available to run the scan.
+      Scanner scanner2 = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner2.setRange(new Range());
+      scanner2.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      scanner2.setTimeout(1, TimeUnit.SECONDS);
+      Iterator<Entry<Key,Value>> iter2 = scanner2.iterator();
+      try {
+        iter2.hasNext();
+        assertNotNull(iter2.next());
+        fail("Expecting ScanTimedOutException");

Review comment:
       This is the only test failing. This test starts one scanner, and then starts another, expecting an exception to occur because TSERV_SCAN_EXECUTORS_DEFAULT_THREADS is set to 1 in the MAC config above. An exception is not thrown. @keith-turner - what's the expected behavior here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791015464



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       As currently implemented a ScanServer performs one scan at a time, much like the Compactor performs one compaction at a time. I figured that if a ScanServer had many threads and performed more than one scan at a time, then we would potentially run into the same situation we have in in the TabletServer w/r/t memory usage.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       Another thought w/r/t to the load balancer code and the number RPCs to ZK, we could move the logic into the Manager and reduce the ZK traffic with ZooCache. I'm not tied to the current approach, I was just trying to make a simple reservation system that could be replaced with a user supplied class that performs more complex logic.
   
   > We could start off implementing the thread pool with syncQ with a hard coded thread of size of one and get this really fast busy exception behavior.
   
   I'm good with that. The Scan Server already returns an error if its working on a scan for another client. I'm currently working on some more ITs and running down an issue I am seeing in one of them. Let's talk more about potential design changes.
   
   
   

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       My latest commit added more tests in ScanServerIT. I need to add a couple more tests that involve the BatchScanner, but I'm pretty well convinced that this approach will work. I think the ScanServer locator piece might be the most complex piece about this change when we are done with it. From a load balancer perspective, it would be nice to know the following for each ScanServer:
   
   - last N extents scanned (increased probability of cache hit?).
   - local TabletServers Tablets, if any (increased probability of locality or page cache hit?)
   
   The extent information is likely sensitive and not something we want client-side. If we choose to use this information for load balancing, then it might have to go into the Manager.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r790981594



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       This seems really expensive doing an RPC to ZK before a scan and an RPC to ZK after the scan to unreserve.  I was thinking we could possibly just select a scan server and try to use it and if its busy it would throw some sort of thrift exception that indicates its busy.  When the client gets the busy exception it will try another scan server, similar to the reservation failing here. However this does not require going to ZK and also the busy check on scan servers could probably be orders of magnitude faster than ZK as it would not require persisting data in the ZK edit log and getting ZK server side quorom for a write.
   
   One way a busy check could work on a scan server is that it has a fixed number of threads in a thread pool using a [SynchronousQueue](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/SynchronousQueue.html).  When a scan task is submitted to this thread pool and all threads are busy the thread pool should reject it.  When the thread pool rejection happens then we can turn around and throw the busy thrift exception. All of this should be really quick on the scan server.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791032803



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       >  I figured that if a ScanServer had many threads and performed more than one scan at a time, then we would potentially run into the same situation we have in in the TabletServer w/r/t memory usage.
   
   I can see the benefit of a single thread.  We could start off implementing the thread pool with syncQ with a hard coded thread of size of one and get this really fast busy exception behavior.  Later on, as we gain experience, we could refine the scan server config and make what causes busy exceptions on a scan server configurable.

##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       Not sure if it achievable but it would be nice if we could have information about scan servers from ZK that is cacheable on the client side (so clients do not have to go to ZK frequently), a busy signal from scan servers, and the previous two bits of information available to client side plugin that makes decisions about which scan servers to use.  If that is workable it could avoid any single bottleneck processes in the cluster hopefully allowing this to scale up to many thousands of scan servers and thousands of clients.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1017857712


   @dlmarion curios if this feature is something you would like to see in 2.1.0?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r837508228



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,1281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends TabletServer implements TabletClientService.Iface {
+
+  static class ScanInformation extends MutableTriple<Long,KeyExtent,Tablet> {
+    private static final long serialVersionUID = 1L;
+
+    public Long getScanId() {
+      return getLeft();
+    }
+
+    public void setScanId(Long scanId) {
+      setLeft(scanId);
+    }
+
+    public KeyExtent getExtent() {
+      return getMiddle();
+    }
+
+    public void setExtent(KeyExtent extent) {
+      setMiddle(extent);
+    }
+
+    public Tablet getTablet() {
+      return getRight();
+    }
+
+    public void setTablet(Tablet tablet) {
+      setRight(tablet);
+    }
+  }
+
+  /**
+   * A compaction manager that does nothing
+   */
+  private static class ScanServerCompactionManager extends CompactionManager {
+
+    public ScanServerCompactionManager(ServerContext context,
+        CompactionExecutorsMetrics ceMetrics) {
+      super(new ArrayList<>(), context, ceMetrics);
+    }
+
+    @Override
+    public void compactableChanged(Compactable compactable) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public CompactionServices getServices() {
+      return null;
+    }
+
+    @Override
+    public boolean isCompactionQueued(KeyExtent extent, Set<CompactionServiceId> servicesUsed) {
+      return false;
+    }
+
+    @Override
+    public int getCompactionsRunning() {
+      return 0;
+    }
+
+    @Override
+    public int getCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
+        String compactorId, ExternalCompactionId externalCompactionId) {
+      return null;
+    }
+
+    @Override
+    public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent extent,
+        CompactionExecutorId ceid) {}
+
+    @Override
+    public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+        KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize,
+        long entries) {}
+
+    @Override
+    public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted,
+        Map<KeyExtent,Tablet> currentTablets) {}
+
+    @Override
+    public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+      return null;
+    }
+
+    @Override
+    public Collection<ExtCompMetric> getExternalMetrics() {
+      return null;
+    }
+
+    @Override
+    public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> servicesUsed,
+        Set<ExternalCompactionId> ecids) {}
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftClientHandler handler;
+  private UUID serverLockUUID;
+  private final TabletMetadataLoader tabletMetadataLoader;
+  private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  protected Set<StoredTabletFile> lockedFiles = new HashSet<>();
+  protected Map<StoredTabletFile,ReservedFile> reservedFiles = new ConcurrentHashMap<>();
+  protected AtomicLong nextScanReservationId = new AtomicLong();
+
+  private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<? extends KeyExtent,? extends TabletMetadata>
+        loadAll(Set<? extends KeyExtent> keys) {
+      long t1 = System.currentTimeMillis();
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys).build().stream()
+          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+      return tms;
+    }
+  }
+
+  public ScanServer(ServerOpts opts, String[] args) {
+    super(opts, args, true);
+
+    // Note: The way to control the number of concurrent scans that a ScanServer will
+    // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or the number
+    // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+    long cacheExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+    long scanServerReservationExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+    tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+    if (cacheExpiration == 0L) {
+      LOG.warn("Tablet metadata caching disabled, may cause excessive scans on metadata table.");
+      tabletMetadataCache = null;
+    } else {
+      if (cacheExpiration < 60000) {
+        LOG.warn(
+            "Tablet metadata caching less than one minute, may cause excessive scans on metadata table.");
+      }
+      tabletMetadataCache =
+          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)
+              .scheduler(Scheduler.systemScheduler()).build(tabletMetadataLoader);
+    }
+    handler = getHandler();
+
+    ThreadPools.watchCriticalScheduledTask(getContext().getScheduledExecutor()
+        .scheduleWithFixedDelay(() -> cleanUpReservedFiles(scanServerReservationExpiration),
+            scanServerReservationExpiration, scanServerReservationExpiration,
+            TimeUnit.MILLISECONDS));
+
+  }
+
+  @VisibleForTesting
+  protected ThriftClientHandler getHandler() {
+    return new ThriftClientHandler(this);
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws UnknownHostException {
+    Iface rpcProxy = TraceUtil.wrapService(this);
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
+    }
+    final TabletClientService.Processor<Iface> processor =
+        new TabletClientService.Processor<>(rpcProxy);
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString());
+
+      try {
+        // Old zk nodes can be cleaned up by ZooZap
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is consistent.");
+        }
+        throw e;
+      }
+
+      serverLockUUID = UUID.randomUUID();
+      tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, serverLockUUID);
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e));
+        }
+      };
+
+      // Don't use the normal ServerServices lock content, instead put the server UUID here.
+      byte[] lockContent = serverLockUUID.toString().getBytes(UTF_8);
+
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
+
+        if (tabletServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", tabletServerLock.getLockPath());
+          return tabletServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
+    }
+    scanMetrics = new TabletServerScanMetrics();
+    MetricsUtil.initializeProducers(scanMetrics);
+
+    // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
+    ceMetrics = new CompactionExecutorsMetrics();
+    this.compactionManager = new ScanServerCompactionManager(getContext(), ceMetrics);
+
+    ServiceLock lock = announceExistence();
+
+    try {
+      while (!serverStopRequested) {
+        UtilWaitThread.sleep(1000);
+      }
+    } finally {
+      LOG.info("Stopping Thrift Servers");
+      address.server.stop();
+
+      LOG.info("Removing server scan references");
+      this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
+          serverLockUUID);
+
+      try {
+        LOG.debug("Closing filesystems");
+        VolumeManager mgr = getContext().getVolumeManager();
+        if (null != mgr) {
+          mgr.close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+      }
+
+      gcLogger.logGCInfo(getConfiguration());
+      LOG.info("stop requested. exiting ... ");
+      try {
+        if (null != lock) {
+          lock.unlock();
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to release scan server lock", e);
+      }
+
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<KeyExtent,TabletMetadata> getTabletMetadata(Collection<KeyExtent> extents) {
+    if (tabletMetadataCache == null) {
+      return (Map<KeyExtent,TabletMetadata>) tabletMetadataLoader
+          .loadAll((Set<? extends KeyExtent>) extents);
+    } else {
+      return tabletMetadataCache.getAll(extents);
+    }
+  }
+
+  static class ReservedFile {
+    Set<Long> activeReservations = new ConcurrentSkipListSet<>();
+    volatile long lastUseTime;
+
+    boolean shouldDelete(long expireTimeMs) {
+      return activeReservations.isEmpty()
+          && System.currentTimeMillis() - lastUseTime > expireTimeMs;
+    }
+  }
+
+  private class FilesLock implements AutoCloseable {
+
+    private final Collection<StoredTabletFile> files;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public FilesLock(Collection<StoredTabletFile> files) {
+      this.files = files;
+    }
+
+    Collection<StoredTabletFile> getLockedFiles() {
+      return files;
+    }
+
+    @Override
+    public void close() {
+      // only allow close to be called once
+      if (!closed.compareAndSet(false, true)) {
+        return;
+      }
+
+      synchronized (lockedFiles) {
+        for (StoredTabletFile file : files) {
+          if (!lockedFiles.remove(file)) {
+            throw new IllegalStateException("tried to unlock file that was not locked");
+          }
+        }
+
+        lockedFiles.notifyAll();
+      }
+    }
+  }
+
+  private FilesLock lockFiles(Collection<StoredTabletFile> files) {
+
+    // lets ensure we lock and unlock that same set of files even if the passed in files changes
+    var filesCopy = Set.copyOf(files);
+
+    synchronized (lockedFiles) {
+
+      while (!Collections.disjoint(filesCopy, lockedFiles)) {
+        try {
+          lockedFiles.wait();
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      for (StoredTabletFile file : filesCopy) {
+        if (!lockedFiles.add(file)) {
+          throw new IllegalStateException("file unexpectedly not added");
+        }
+      }
+    }
+
+    return new FilesLock(filesCopy);
+  }
+
+  class ScanReservation implements AutoCloseable {
+
+    private final Collection<StoredTabletFile> files;
+    private final long myReservationId;
+    private final Map<KeyExtent,TabletMetadata> tabletsMetadata;
+
+    ScanReservation(Map<KeyExtent,TabletMetadata> tabletsMetadata, long myReservationId) {
+      this.tabletsMetadata = tabletsMetadata;
+      this.files = tabletsMetadata.values().stream().flatMap(tm -> tm.getFiles().stream())
+          .collect(Collectors.toUnmodifiableSet());
+      this.myReservationId = myReservationId;
+    }
+
+    ScanReservation(Collection<StoredTabletFile> files, long myReservationId) {
+      this.tabletsMetadata = null;
+      this.files = files;
+      this.myReservationId = myReservationId;
+    }
+
+    public TabletMetadata getTabletMetadata(KeyExtent extent) {
+      return tabletsMetadata.get(extent);
+    }
+
+    Tablet newTablet(KeyExtent extent) throws IOException {
+      var tabletMetadata = getTabletMetadata(extent);
+      TabletData data = new TabletData(tabletMetadata);
+      TabletResourceManager trm = resourceManager.createTabletResourceManager(
+          tabletMetadata.getExtent(), getTableConfiguration(tabletMetadata.getExtent()));
+      return new Tablet(ScanServer.this, tabletMetadata.getExtent(), trm, data, true);
+    }
+
+    @Override
+    public void close() {
+      try (FilesLock flock = lockFiles(files)) {
+        for (StoredTabletFile file : flock.getLockedFiles()) {
+          var reservedFile = reservedFiles.get(file);
+
+          if (!reservedFile.activeReservations.remove(myReservationId)) {
+            throw new IllegalStateException("reservation id was not in set as expected");
+          }
+
+          LOG.trace("RFFS {} unreserved reference for file {}", myReservationId, file);
+
+          // TODO maybe use nano time
+          reservedFile.lastUseTime = System.currentTimeMillis();
+        }
+      }
+    }
+  }
+
+  private Map<KeyExtent,TabletMetadata> reserveFilesInner(Collection<KeyExtent> extents,
+      long myReservationId) throws NotServingTabletException, AccumuloException {
+    // RFS is an acronym for Reference files for scan
+    LOG.trace("RFFS {} ensuring files are referenced for scan of extents {}", myReservationId,
+        extents);
+
+    Map<KeyExtent,TabletMetadata> tabletsMetadata = getTabletMetadata(extents);
+
+    for (KeyExtent extent : extents) {
+      var tabletMetadata = tabletsMetadata.get(extent);
+      if (tabletMetadata == null) {
+        LOG.trace("RFFS {} extent not found in metadata table {}", myReservationId, extent);
+        throw new NotServingTabletException();
+      }
+
+      boolean canLoad =
+          AssignmentHandler.checkTabletMetadata(extent, getTabletSession(), tabletMetadata, true);
+
+      // TODO handle canLoad == false
+    }
+
+    Map<StoredTabletFile,KeyExtent> allFiles = new HashMap<>();
+
+    tabletsMetadata.forEach((extent, tm) -> {
+      tm.getFiles().forEach(file -> allFiles.put(file, extent));
+    });
+
+    try (FilesLock flock = lockFiles(allFiles.keySet())) {
+      Set<StoredTabletFile> filesToReserve = new HashSet<>();
+      List<ScanServerRefTabletFile> refs = new ArrayList<>();
+      Set<KeyExtent> tabletsToCheck = new HashSet<>();
+
+      String serverAddress = clientAddress.toString();
+
+      for (StoredTabletFile file : flock.getLockedFiles()) {
+        if (!reservedFiles.containsKey(file)) {
+          refs.add(new ScanServerRefTabletFile(file.getPathStr(), serverAddress, serverLockUUID));
+          filesToReserve.add(file);
+          tabletsToCheck.add(Objects.requireNonNull(allFiles.get(file)));
+          LOG.trace("RFFS {} need to add scan ref for file {}", myReservationId, file);
+        }
+      }
+
+      if (!filesToReserve.isEmpty()) {
+        getContext().getAmple().putScanServerFileReferences(refs);
+
+        // After we insert the scan server refs we need to check and see if the tablet is still
+        // using the file. As long as the tablet is still using the files then the Accumulo GC
+        // should not have deleted the files. This assumes the Accumulo GC reads scan server refs
+        // after tablet refs from the metadata table.
+
+        if (tabletMetadataCache != null) {
+          // lets clear the cache so we get the latest
+          tabletMetadataCache.invalidateAll(tabletsToCheck);
+        }
+
+        var tabletsToCheckMetadata = getTabletMetadata(tabletsToCheck);
+
+        for (KeyExtent extent : tabletsToCheck) {
+          TabletMetadata metadataAfter = tabletsToCheckMetadata.get(extent);
+          if (metadataAfter == null) {
+            getContext().getAmple().deleteScanServerFileReferences(refs);
+            throw new NotServingTabletException();
+          }
+
+          // remove files that are still referenced
+          filesToReserve.removeAll(metadataAfter.getFiles());
+        }
+
+        // if this is not empty it means some files that we reserved are no longer referenced by
+        // tablets. This means there could have been a time gap where nothing referenced a file
+        // meaning it could have been GCed.
+        if (!filesToReserve.isEmpty()) {
+          LOG.trace("RFFS {} tablet files changed while attempting to reference files {}",
+              myReservationId, filesToReserve);
+          getContext().getAmple().deleteScanServerFileReferences(refs);
+          return null;
+        }
+      }
+
+      for (StoredTabletFile file : flock.getLockedFiles()) {
+        if (!reservedFiles.computeIfAbsent(file, k -> new ReservedFile()).activeReservations
+            .add(myReservationId)) {
+          throw new IllegalStateException("reservation id unexpectedly already in set");
+        }
+
+        LOG.trace("RFFS {} reserved reference for startScan {}", myReservationId, file);
+      }
+
+    }
+
+    return tabletsMetadata;
+  }
+
+  protected ScanReservation reserveFiles(Collection<KeyExtent> extents)
+      throws NotServingTabletException, AccumuloException {
+
+    long myReservationId = nextScanReservationId.incrementAndGet();
+
+    Map<KeyExtent,TabletMetadata> tabletsMetadata = reserveFilesInner(extents, myReservationId);
+    while (tabletsMetadata == null) {
+      tabletsMetadata = reserveFilesInner(extents, myReservationId);
+    }
+
+    return new ScanReservation(tabletsMetadata, myReservationId);
+  }
+
+  protected ScanReservation reserveFiles(long scanId) throws NoSuchScanIDException {
+    var session = (ScanSession) sessionManager.getSession(scanId);
+    if (session == null) {
+      throw new NoSuchScanIDException();
+    }
+
+    Set<StoredTabletFile> scanSessionFiles;
+
+    if (session instanceof SingleScanSession) {
+      var sss = (SingleScanSession) session;
+      scanSessionFiles =
+          Set.copyOf(session.getTabletResolver().getTablet(sss.extent).getDatafiles().keySet());
+    } else if (session instanceof MultiScanSession) {
+      var mss = (MultiScanSession) session;
+      scanSessionFiles = mss.exents.stream()
+          .flatMap(e -> mss.getTabletResolver().getTablet(e).getDatafiles().keySet().stream())
+          .collect(Collectors.toUnmodifiableSet());
+    } else {
+      throw new IllegalArgumentException("Unknown session type " + session.getClass().getName());
+    }
+
+    long myReservationId = nextScanReservationId.incrementAndGet();
+
+    try (FilesLock flock = lockFiles(scanSessionFiles)) {
+      if (!reservedFiles.keySet().containsAll(scanSessionFiles)) {
+        // the files are no longer reserved in the metadata table, so lets pretend there is no scan
+        // session
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("RFFS {} files are no longer referenced on continue scan {} {}",
+              myReservationId, scanId, Sets.difference(scanSessionFiles, reservedFiles.keySet()));
+        }
+        throw new NoSuchScanIDException();
+      }
+
+      for (StoredTabletFile file : flock.getLockedFiles()) {
+        if (!reservedFiles.get(file).activeReservations.add(myReservationId)) {
+          throw new IllegalStateException("reservation id unexpectedly already in set");
+        }
+
+        LOG.trace("RFFS {} reserved reference for continue scan {} {}", myReservationId, scanId,
+            file);
+      }
+    }
+
+    return new ScanReservation(scanSessionFiles, myReservationId);
+  }
+
+  private void cleanUpReservedFiles(long expireTimeMs) {
+    List<StoredTabletFile> candidates = new ArrayList<>();
+
+    reservedFiles.forEach((file, reservationInfo) -> {
+      if (reservationInfo.shouldDelete(expireTimeMs)) {
+        candidates.add(file);
+      }
+    });
+
+    if (!candidates.isEmpty()) {
+      // gain exclusive access to files to avoid multiple threads adding/deleting file reservations
+      // at same time
+      try (FilesLock flock = lockFiles(candidates)) {
+        List<ScanServerRefTabletFile> refsToDelete = new ArrayList<>();
+        List<StoredTabletFile> confirmed = new ArrayList<>();
+
+        String serverAddress = clientAddress.toString();
+
+        // check that is still a candidate now that files are locked and no other thread should be
+        // modifying them
+        for (StoredTabletFile candidate : flock.getLockedFiles()) {
+          var reservation = reservedFiles.get(candidate);
+          if (reservation != null && reservation.shouldDelete(expireTimeMs)) {
+            refsToDelete.add(
+                new ScanServerRefTabletFile(candidate.getPathStr(), serverAddress, serverLockUUID));
+            confirmed.add(candidate);
+            LOG.trace("RFFS referenced files has not been used recently, removing reference {}",
+                candidate);
+          }
+        }
+
+        getContext().getAmple().deleteScanServerFileReferences(refsToDelete);
+
+        // those refs were successfully removed from metadata table, so remove them from the map
+        reservedFiles.keySet().removeAll(confirmed);
+
+      }
+    }
+  }
+
+  /*
+   * This simple method exists to be overridden in tests
+   */
+  protected KeyExtent getKeyExtent(TKeyExtent textent) {
+    return KeyExtent.fromThrift(textent);
+  }
+
+  protected TabletResolver getScanTabletResolver(final Tablet tablet) {
+    return new TabletResolver() {
+      @Override
+      public Tablet getTablet(KeyExtent extent) {
+        if (extent.equals(tablet.getExtent())) {
+          return tablet;
+        } else {
+          return null;
+        }
+      }
+    };
+  }
+
+  protected TabletResolver getBatchScanTabletResolver(final HashMap<KeyExtent,Tablet> tablets) {
+    return tablets::get;
+  }
+
+  @Override
+  public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
+      TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
+      Map<String,Map<String,String>> ssio, List<ByteBuffer> authorizations, boolean waitForWrites,
+      boolean isolated, long readaheadThreshold, TSamplerConfiguration samplerConfig,
+      long batchTimeOut, String classLoaderContext, Map<String,String> executionHints,
+      long busyTimeout) throws ThriftSecurityException, NotServingTabletException,
+      TooManyFilesException, TSampleNotPresentException, TException {
+
+    KeyExtent extent = getKeyExtent(textent);
+
+    try (ScanReservation reservation = reserveFiles(Collections.singleton(extent))) {
+
+      Tablet tablet = reservation.newTablet(extent);
+
+      InitialScan is = handler.startScan(tinfo, credentials, extent, range, columns, batchSize,
+          ssiList, ssio, authorizations, waitForWrites, isolated, readaheadThreshold, samplerConfig,
+          batchTimeOut, classLoaderContext, executionHints, getScanTabletResolver(tablet),
+          busyTimeout);
+
+      return is;
+
+    } catch (AccumuloException | IOException e) {
+      // TODO is this correct?

Review comment:
       I think this should be ok. ThriftClientHandler throws RuntimeException from startScan and continueScan in some cases.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r838049846



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftClientHandler.java
##########
@@ -353,36 +367,38 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t
   }
 
   @Override
-  public ScanResult continueScan(TInfo tinfo, long scanID) throws NoSuchScanIDException,
-      NotServingTabletException, org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-      TSampleNotPresentException {
+  public ScanResult continueScan(TInfo tinfo, long scanID, long busyTimeout)
+      throws NoSuchScanIDException, NotServingTabletException,
+      org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
+      TSampleNotPresentException, ScanServerBusyException {
     SingleScanSession scanSession =
         (SingleScanSession) server.sessionManager.reserveSession(scanID);
     if (scanSession == null) {
       throw new NoSuchScanIDException();
     }
 
     try {
-      return continueScan(tinfo, scanID, scanSession);
+      return continueScan(tinfo, scanID, scanSession, busyTimeout);
     } finally {
       server.sessionManager.unreserveSession(scanSession);
     }
   }
 
-  private ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession)
-      throws NoSuchScanIDException, NotServingTabletException,
+  protected ScanResult continueScan(TInfo tinfo, long scanID, SingleScanSession scanSession,
+      long busyTimeout) throws NoSuchScanIDException, NotServingTabletException,
       org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException,
-      TSampleNotPresentException {
+      TSampleNotPresentException, ScanServerBusyException {
 
     if (scanSession.nextBatchTask == null) {
+      // TODO look into prioritizing continue scan task in thread pool queue

Review comment:
       yeah I can remove the todo and possibly open another issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r839679664



##########
File path: core/src/test/java/org/apache/accumulo/core/spi/scan/DefaultScanServerDispatcherTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.jupiter.api.Assertions.*;
+

Review comment:
       I'm not sure how @keith-turner is building, he might be skipping the formatter. So, when I merge changes into my PR branch from him, then I have to go back and fix the formatting.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840559421



##########
File path: test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
##########
@@ -149,6 +149,7 @@ public void testAccumuloClientBuilder() throws Exception {
     props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers);
     props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), user1);
     props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "22s");
+    props.put(ClientProperty.SCAN_SERVER_DISPATCHER_OPTS_PREFIX + "enabled", "true");

Review comment:
       Removed in 7521ae6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840541182



##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
##########
@@ -56,6 +56,7 @@
   private Map<String,String> configuredSiteConig = new HashMap<>();
   private Map<String,String> clientProps = new HashMap<>();
   private int numTservers = 2;
+  private int numScanServers = 2;

Review comment:
       I'll change this to zero.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791123988



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       My latest commit added more tests in ScanServerIT. I need to add a couple more tests that involve the BatchScanner, but I'm pretty well convinced that this approach will work. I think the ScanServer locator piece might be the most complex piece about this change when we are done with it. From a load balancer perspective, it would be nice to know the following for each ScanServer:
   
   - last N extents scanned (increased probability of cache hit?).
   - local TabletServers Tablets, if any (increased probability of locality or page cache hit?)
   
   The extent information is likely sensitive and not something we want client-side. If we choose to use this information for load balancing, then it might have to go into the Manager.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] EdColeman commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
EdColeman commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r839654152



##########
File path: core/src/test/java/org/apache/accumulo/core/spi/scan/DefaultScanServerDispatcherTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.jupiter.api.Assertions.*;
+

Review comment:
       star import fails checkstyle should be: 
   
   Suggestion ```
   import static org.junit.jupiter.api.Assertions.assertEquals;
   import static org.junit.jupiter.api.Assertions.assertThrows;
   import static org.junit.jupiter.api.Assertions.assertTrue;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] EdColeman commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
EdColeman commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r839686837



##########
File path: core/src/test/java/org/apache/accumulo/core/spi/scan/DefaultScanServerDispatcherTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.jupiter.api.Assertions.*;
+

Review comment:
       I was just curious how my prop store changes feared with this PR and it broke my build.  Not related directly to this PR, but so far - the merge was clean and all non-IT tests passed.  Running sunny profile now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r838050726



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
##########
@@ -138,6 +175,13 @@ public T get(long timeout, TimeUnit unit)
     return rAsT;
   }
 
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // TODO probably no longer makes sense to extend future

Review comment:
       This one does go with this PR.  The changes to add busy timeout changed the class such that it no longer makes sense to extend future.  I'll look at this tomorrow. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r837388185



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
##########
@@ -43,11 +45,12 @@
 
   public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent,
       Map<KeyExtent,List<Range>> queries, ScanParameters scanParams,
-
-      Map<String,String> executionHints) {
-    super(credentials, scanParams, executionHints);
+      Map<String,String> executionHints, TabletResolver tabletResolver) {
+    super(credentials, scanParams, executionHints, tabletResolver);
     this.queries = queries;
     this.threadPoolExtent = threadPoolExtent;
+    // TODO this is only needed for scan server

Review comment:
       I don't think there is anything to do here. We could clean up ScanSession and MultiScanSession, but that could also be done in another PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r839785887



##########
File path: core/src/test/java/org/apache/accumulo/core/spi/scan/DefaultScanServerDispatcherTest.java
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.jupiter.api.Assertions.*;
+

Review comment:
       > I'm not sure how @keith-turner is building, he might be skipping the formatter. So, when I merge changes into my PR branch from him, then I have to go back and fix the formatting.
   
   I usually don't try to constantly fight with all of the maven checks while working on something.  I just leave that till the end for the most part and resolve them before submitting a PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840529608



##########
File path: test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
##########
@@ -149,6 +149,7 @@ public void testAccumuloClientBuilder() throws Exception {
     props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers);
     props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), user1);
     props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "22s");
+    props.put(ClientProperty.SCAN_SERVER_DISPATCHER_OPTS_PREFIX + "enabled", "true");

Review comment:
       This was from an early modification. I'll remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840559656



##########
File path: server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
##########
@@ -176,6 +178,25 @@ public void execute(String[] args) throws Exception {
 
       }
 
+      if (opts.zapScanServers) {
+        String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
+        try {
+          List<String> children = zoo.getChildren(sserversPath);
+          for (String child : children) {
+            message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts);
+
+            var zLockPath = ServiceLock.path(sserversPath + "/" + child);
+            if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
+              if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {

Review comment:
       Fixed in 7521ae6




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1021503545


   @keith-turner and I had a discussion about changing the current design. The key points from the discussion are:
   
   1. Modifying the load balancing logic from the current implementation (which uses ZK to know which scan servers are available) to an implemenation that selects  candidate scan servers to communicate with for scanning an extent. This new implementation would not require ZK and would use a hashing algorithm so that all clients select the same set of scan servers for an extent in an attempt to make use of the index and data block caches.
   
   2. We may want to have configuration settings for the scan server for things that are inherited from the tablet server. For example, the data block cache size. Does the scan server use the current configuration property name, or should it have its own.
   
   3. ~~Modifying the scan server to perform more than one concurrent scan.~~ Added in [0d553de]
   
   4. ~~Modify the client configuration code such that the name is not tied to the implementation.~~ (This has been completed with the addition of `ScannerBase.setConsistencyLevel(ConsistencyLevel)` 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1055794007


   Items remaining:
   
   1. Completing the client-side load balancing / scan server selection.
   
   2. Look at adding ScanServer properties for the block caches (and maybe other properties) so that they are distinct in the configuration.
   
   @keith-turner is working (1) now and I will start on (2) shortly.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791042959



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       Another thought w/r/t to the load balancer code and the number RPCs to ZK, we could move the logic into the Manager and reduce the ZK traffic with ZooCache. I'm not tied to the current approach, I was just trying to make a simple reservation system that could be replaced with a user supplied class that performs more complex logic.
   
   > We could start off implementing the thread pool with syncQ with a hard coded thread of size of one and get this really fast busy exception behavior.
   
   I'm good with that. The Scan Server already returns an error if its working on a scan for another client. I'm currently working on some more ITs and running down an issue I am seeing in one of them. Let's talk more about potential design changes.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1021542074


   > Modifying the scan server to perform more than one concurrent scan.
   
   When we discussed this, we realized there is an interesting tradeoff between isolation and caching. Running a single concurrent scan per scan server prevents a rouge scan that gobbles up all memory from negatively impacting other scans.  On the other had running multiple concurrent scans per scan server enables scans to benefit from a larger shared cache (which may be useful if tablets are repeatedly scanned using the same scan server).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r838051628



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/session/MultiScanSession.java
##########
@@ -43,11 +45,12 @@
 
   public MultiScanSession(TCredentials credentials, KeyExtent threadPoolExtent,
       Map<KeyExtent,List<Range>> queries, ScanParameters scanParams,
-
-      Map<String,String> executionHints) {
-    super(credentials, scanParams, executionHints);
+      Map<String,String> executionHints, TabletResolver tabletResolver) {
+    super(credentials, scanParams, executionHints, tabletResolver);
     this.queries = queries;
     this.threadPoolExtent = threadPoolExtent;
+    // TODO this is only needed for scan server

Review comment:
       oh was thinking of only making the set copy if its scan server.. I will look at this tomorrow. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r830240059



##########
File path: test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.clientImpl.ThriftScanner.ScanTimedOutException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ScanServerIT extends SharedMiniClusterBase {
+
+  private static class ScanServerITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+      cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+    }
+  }
+
+  @BeforeClass
+  public static void start() throws Exception {
+    ScanServerITConfiguration c = new ScanServerITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testScan() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        int count = 0;
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : scanner) {
+          count++;
+        }
+        assertEquals(100, count);
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testBatchScan() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRanges(Collections.singletonList(new Range()));
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        int count = 0;
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : scanner) {
+          count++;
+        }
+        assertEquals(100, count);
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  // TODO: This test currently fails, but we could change the client code to make it work.
+  @Test
+  public void testScanOfflineTable() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+      client.tableOperations().offline(tableName, true);
+
+      assertThrows(TableOfflineException.class, () -> {
+        try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+          scanner.setRange(new Range());
+          scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+          int count = 0;
+          for (@SuppressWarnings("unused")
+          Entry<Key,Value> entry : scanner) {
+            count++;
+          }
+          assertEquals(100, count);
+        } // when the scanner is closed, all open sessions should be closed
+      });
+    }
+  }
+
+  @Test
+  public void testScanServerBusy() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner.setRange(new Range());
+      scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      // We only inserted 100 rows, default batch size is 1000. If we don't set the
+      // batch size lower, then the server side code will automatically close the
+      // scanner on the first call to continueScan. We want to keep it open, so lower the batch
+      // size.
+      scanner.setBatchSize(10);
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+      iter.next();
+      iter.next();
+      // At this point the tablet server will time out this scan after TSERV_SESSION_MAXIDLE
+      // Start up another scanner and set it to time out in 1s. It should fail because there
+      // is no scan server available to run the scan.
+      Scanner scanner2 = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner2.setRange(new Range());
+      scanner2.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      scanner2.setTimeout(1, TimeUnit.SECONDS);
+      Iterator<Entry<Key,Value>> iter2 = scanner2.iterator();
+      try {
+        iter2.hasNext();
+        assertNotNull(iter2.next());
+        fail("Expecting ScanTimedOutException");

Review comment:
       This is the only test failing. This test starts one scanner, and then starts another, expecting an exception to occur because SSERV_SCAN_EXECUTORS_DEFAULT_THREADS is set to 1 in the MAC config above. An exception is not thrown. @keith-turner - what's the expected behavior here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r836788924



##########
File path: test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.clientImpl.ThriftScanner.ScanTimedOutException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ScanServerIT extends SharedMiniClusterBase {
+
+  private static class ScanServerITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+      cfg.setProperty(Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1");
+    }
+  }
+
+  @BeforeClass
+  public static void start() throws Exception {
+    ScanServerITConfiguration c = new ScanServerITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterClass
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testScan() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        int count = 0;
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : scanner) {
+          count++;
+        }
+        assertEquals(100, count);
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testBatchScan() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRanges(Collections.singletonList(new Range()));
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        int count = 0;
+        for (@SuppressWarnings("unused")
+        Entry<Key,Value> entry : scanner) {
+          count++;
+        }
+        assertEquals(100, count);
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  // TODO: This test currently fails, but we could change the client code to make it work.
+  @Test
+  public void testScanOfflineTable() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+      client.tableOperations().offline(tableName, true);
+
+      assertThrows(TableOfflineException.class, () -> {
+        try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+          scanner.setRange(new Range());
+          scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+          int count = 0;
+          for (@SuppressWarnings("unused")
+          Entry<Key,Value> entry : scanner) {
+            count++;
+          }
+          assertEquals(100, count);
+        } // when the scanner is closed, all open sessions should be closed
+      });
+    }
+  }
+
+  @Test
+  public void testScanServerBusy() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, getClientInfo(), 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner.setRange(new Range());
+      scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      // We only inserted 100 rows, default batch size is 1000. If we don't set the
+      // batch size lower, then the server side code will automatically close the
+      // scanner on the first call to continueScan. We want to keep it open, so lower the batch
+      // size.
+      scanner.setBatchSize(10);
+      Iterator<Entry<Key,Value>> iter = scanner.iterator();
+      iter.next();
+      iter.next();
+      // At this point the tablet server will time out this scan after TSERV_SESSION_MAXIDLE
+      // Start up another scanner and set it to time out in 1s. It should fail because there
+      // is no scan server available to run the scan.
+      Scanner scanner2 = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner2.setRange(new Range());
+      scanner2.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      scanner2.setTimeout(1, TimeUnit.SECONDS);
+      Iterator<Entry<Key,Value>> iter2 = scanner2.iterator();
+      try {
+        iter2.hasNext();
+        assertNotNull(iter2.next());
+        fail("Expecting ScanTimedOutException");

Review comment:
       This is still an issue.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1024522042


   Some annotated shell output:
   ```bash
   root@test> createtable test (1)
   root@test test> insert a b c d (2)
   root@test test> scan (3)
   a b:c []	d
   root@test test> scan -cl immediate (4)
   a b:c []	d
   root@test test> scan -cl eventual (5)
   root@test test> flush (6)
   2022-01-28T18:58:10,693 [shell.Shell] INFO : Flush of table test  initiated...
   root@test test> scan (7)
   a b:c []	d
   root@test test> scan -cl eventual (8)
   a b:c []	d
   ```
   
   I create a table (1) and insert some data (2). When I run a scan (3,4) with the `immediate` consistency level, which happens to be the default, the scan uses normal code path and issues the scan command against the tablet server. Data is returned because the tablet server code path also returns data that is in the in-memory map. When I scan with the `eventual` consistency level no data is returned because the code path in the scanner only uses the data in the tablet's files. When I flush (6) the data to write a file in HDFS, the subsequent scans with `immediate` (7) and `eventual` (8) consistency level return the data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1055794007


   Items remaining:
   
   1. Completing the client-side load balancing / scan server selection.
   
   2. Look at adding ScanServer properties for the block caches (and maybe other properties) so that they are distinct in the configuration.
   
   @keith-turner is working (1) now and I will start on (2) shortly.
   
   Possible future work:
   
   ScanServer pools - provide an argument (pool name, tag, etc) to the ScanServer process that will be placed into the ZK node's data that can be used by the client side load balancer for determining which pool to use for a particular scan.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1072760603


   @keith-turner  - I made updates to ScanServerIT.testScanServerBusy in 39a9e5e. It's still not working as I would expect. I do see `ScanServerBusyException` being thrown on the server side, and I do see `2022-03-18T19:55:52,100 [clientImpl.ThriftScanner] TRACE: Scan failed, scan server was busy (1<<,ip-127.0.0.1:44375,10001a2e9b20005)` in the log on the client side. The client side error gets logged a couple of times, then it appears that both scans are running and returning data even though I set `Property.TSERV_SCAN_EXECUTORS_DEFAULT_THREADS` to `1` in the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791085150



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       Not sure if it achievable but it would be nice if we could have information about scan servers from ZK that is cacheable on the client side (so clients do not have to go to ZK frequently), a busy signal from scan servers, and the previous two bits of information available to client side plugin that makes decisions about which scan servers to use.  If that is workable it could avoid any single bottleneck processes in the cluster hopefully allowing this to scale up to many thousands of scan servers and thousands of clients.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r817083714



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,896 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.ScanServerDiscovery;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.ServerServices;
+import org.apache.accumulo.core.util.ServerServices.Service;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanServer extends TabletServer implements TabletClientService.Iface {
+
+  /**
+   * A compaction manager that does nothing
+   */
+  private static class ScanServerCompactionManager extends CompactionManager {
+
+    public ScanServerCompactionManager(ServerContext context,
+        CompactionExecutorsMetrics ceMetrics) {
+      super(new ArrayList<>(), context, ceMetrics);
+    }
+
+    @Override
+    public void compactableChanged(Compactable compactable) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public CompactionServices getServices() {
+      return null;
+    }
+
+    @Override
+    public boolean isCompactionQueued(KeyExtent extent, Set<CompactionServiceId> servicesUsed) {
+      return false;
+    }
+
+    @Override
+    public int getCompactionsRunning() {
+      return 0;
+    }
+
+    @Override
+    public int getCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
+        String compactorId, ExternalCompactionId externalCompactionId) {
+      return null;
+    }
+
+    @Override
+    public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent extent,
+        CompactionExecutorId ceid) {}
+
+    @Override
+    public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+        KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize,
+        long entries) {}
+
+    @Override
+    public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted,
+        Map<KeyExtent,Tablet> currentTablets) {}
+
+    @Override
+    public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+      return null;
+    }
+
+    @Override
+    public Collection<ExtCompMetric> getExternalMetrics() {
+      return null;
+    }
+
+    @Override
+    public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> servicesUsed,
+        Set<ExternalCompactionId> ecids) {}
+
+  }
+
+  protected static class CurrentScan {
+    protected KeyExtent extent;
+    protected Tablet tablet;
+    protected Long scanId;
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftClientHandler handler;
+  protected CurrentScan currentScan = new CurrentScan();
+
+  public ScanServer(ServerOpts opts, String[] args) {
+    super(opts, args, true);
+    handler = getHandler();
+  }
+
+  protected ThriftClientHandler getHandler() {
+    return new ThriftClientHandler(this);
+  }
+
+  private void cleanupTimedOutSession() {
+    synchronized (currentScan) {
+      if (currentScan.scanId != null && !sessionManager.exists(currentScan.scanId)) {
+        LOG.info("{} is no longer active, ending scan", currentScan.scanId);
+        endScan();
+      }
+    }
+  }
+
+  /**
+   * Start the thrift service to handle incoming client requests
+   *
+   * @return address of this client service
+   * @throws UnknownHostException
+   *           host unknown
+   */
+  protected ServerAddress startScanServerClientService() throws UnknownHostException {
+    Iface rpcProxy = TraceUtil.wrapService(this);
+    if (getContext().getThriftServerType() == ThriftServerType.SASL) {
+      rpcProxy = TCredentialsUpdatingWrapper.service(rpcProxy, getClass(), getConfiguration());
+    }
+    final TabletClientService.Processor<Iface> processor =
+        new TabletClientService.Processor<>(rpcProxy);
+
+    Property maxMessageSizeProperty =
+        (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
+            ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
+    ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
+        Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
+        "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS,
+        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty);
+
+    LOG.info("address = {}", sp.address);
+    return sp;
+  }
+
+  /**
+   * Set up nodes and locks in ZooKeeper for this Compactor
+   */
+  private ServiceLock announceExistence() {
+    ZooReaderWriter zoo = getContext().getZooReaderWriter();
+    try {
+
+      var zLockPath = ServiceLock.path(
+          getContext().getZooKeeperRoot() + Constants.ZSSERVERS + "/" + getClientAddressString());
+
+      try {
+        zoo.putPersistentData(zLockPath.toString(), new byte[] {}, NodeExistsPolicy.SKIP);
+      } catch (KeeperException e) {
+        if (e.code() == KeeperException.Code.NOAUTH) {
+          LOG.error("Failed to write to ZooKeeper. Ensure that"
+              + " accumulo.properties, specifically instance.secret, is consistent.");
+        }
+        throw e;
+      }
+
+      tabletServerLock = new ServiceLock(zoo.getZooKeeper(), zLockPath, UUID.randomUUID());
+
+      LockWatcher lw = new LockWatcher() {
+
+        @Override
+        public void lostLock(final LockLossReason reason) {
+          Halt.halt(serverStopRequested ? 0 : 1, () -> {
+            if (!serverStopRequested) {
+              LOG.error("Lost tablet server lock (reason = {}), exiting.", reason);
+            }
+            gcLogger.logGCInfo(getConfiguration());
+          });
+        }
+
+        @Override
+        public void unableToMonitorLockNode(final Exception e) {
+          Halt.halt(1, () -> LOG.error("Lost ability to monitor scan server lock, exiting.", e));
+        }
+      };
+
+      byte[] lockContent = new ServerServices(getClientAddressString(), Service.SSERV_CLIENT)
+          .toString().getBytes(UTF_8);
+      for (int i = 0; i < 120 / 5; i++) {
+        zoo.putPersistentData(zLockPath.toString(), new byte[0], NodeExistsPolicy.SKIP);
+
+        if (tabletServerLock.tryLock(lw, lockContent)) {
+          LOG.debug("Obtained scan server lock {}", tabletServerLock.getLockPath());
+          return tabletServerLock;
+        }
+        LOG.info("Waiting for scan server lock");
+        sleepUninterruptibly(5, TimeUnit.SECONDS);
+      }
+      String msg = "Too many retries, exiting.";
+      LOG.info(msg);
+      throw new RuntimeException(msg);
+    } catch (Exception e) {
+      LOG.info("Could not obtain scan server lock, exiting.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void run() {
+    SecurityUtil.serverLogin(getConfiguration());
+
+    ServerAddress address = null;
+    try {
+      address = startScanServerClientService();
+      clientAddress = address.getAddress();
+    } catch (UnknownHostException e1) {
+      throw new RuntimeException("Failed to start the compactor client service", e1);
+    }
+
+    ServiceLock lock = announceExistence();
+
+    try {
+      MetricsUtil.initializeMetrics(getContext().getConfiguration(), this.applicationName,
+          clientAddress);
+    } catch (Exception e1) {
+      LOG.error("Error initializing metrics, metrics will not be emitted.", e1);
+    }
+    scanMetrics = new TabletServerScanMetrics();
+    MetricsUtil.initializeProducers(scanMetrics);
+
+    // We need to set the compaction manager so that we don't get an NPE in CompactableImpl.close
+    ceMetrics = new CompactionExecutorsMetrics();
+    this.compactionManager = new ScanServerCompactionManager(getContext(), ceMetrics);
+
+    // SessionManager times out sessions when sessions have been idle longer than
+    // TSERV_SESSION_MAXIDLE. Create a background thread that looks for sessions that
+    // have timed out and end the scan.
+    long maxIdle =
+        this.getContext().getConfiguration().getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+    LOG.debug("Looking for timed out scan sessions every {}ms", Math.max(maxIdle / 2, 1000));
+    this.getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> cleanupTimedOutSession(),
+        Math.max(maxIdle / 2, 1000), Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS);
+
+    try {
+      try {
+        ScanServerDiscovery.unreserve(this.getContext().getZooKeeperRoot(),
+            getContext().getZooReaderWriter(), getClientAddressString());
+      } catch (Exception e2) {
+        throw new RuntimeException("Error setting initial unreserved state in ZooKeeper", e2);
+      }
+
+      while (!serverStopRequested) {
+        UtilWaitThread.sleep(1000);
+      }
+    } finally {
+      LOG.info("Stopping Thrift Servers");
+      TServerUtils.stopTServer(address.server);
+
+      try {
+        LOG.debug("Closing filesystems");
+        VolumeManager mgr = getContext().getVolumeManager();
+        if (null != mgr) {
+          mgr.close();
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
+      }
+
+      gcLogger.logGCInfo(getConfiguration());
+      LOG.info("stop requested. exiting ... ");
+      try {
+        if (null != lock) {
+          lock.unlock();
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to release scan server lock", e);
+      }
+
+    }
+  }
+
+  private synchronized void checkInUse() {
+    synchronized (currentScan) {
+      if (currentScan.extent != null || currentScan.tablet != null) {
+        throw new RuntimeException("Scan server in use for another query");
+      }
+    }
+  }
+
+  protected synchronized boolean loadTablet(TKeyExtent textent)
+      throws IllegalArgumentException, IOException, AccumuloException {
+    synchronized (currentScan) {
+      KeyExtent extent = KeyExtent.fromThrift(textent);
+      TabletMetadata tabletMetadata = getContext().getAmple().readTablet(extent);

Review comment:
       This has been added, see `tabletMetadataCache` in ScanServer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1055794007


   Items remaining:
   
   1. Completing the client-side load balancing / scan server selection.
   
   2. ~~Look at adding ScanServer properties for the block caches (and maybe other properties) so that they are distinct in the configuration.~~ (Added in 1fb82b1)
   
   @keith-turner is working (1) now and I will start on (2) shortly.
   
   Possible future work:
   
   ScanServer pools - provide an argument (pool name, tag, etc) to the ScanServer process that will be placed into the ZK node's data that can be used by the client side load balancer for determining which pool to use for a particular scan.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r838050726



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
##########
@@ -138,6 +175,13 @@ public T get(long timeout, TimeUnit unit)
     return rAsT;
   }
 
+  @Override
+  public T get(long timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    // TODO probably no longer makes sense to extend future

Review comment:
       This one does go with this PR.  The changes to add busy timeout changed the class such that it no longer makes sense to extend future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840098253



##########
File path: server/base/src/main/java/org/apache/accumulo/server/util/ZooZap.java
##########
@@ -176,6 +178,25 @@ public void execute(String[] args) throws Exception {
 
       }
 
+      if (opts.zapScanServers) {
+        String sserversPath = Constants.ZROOT + "/" + iid + Constants.ZSSERVERS;
+        try {
+          List<String> children = zoo.getChildren(sserversPath);
+          for (String child : children) {
+            message("Deleting " + sserversPath + "/" + child + " from zookeeper", opts);
+
+            var zLockPath = ServiceLock.path(sserversPath + "/" + child);
+            if (!zoo.getChildren(zLockPath.toString()).isEmpty()) {
+              if (!ServiceLock.deleteLock(zoo, zLockPath, "tserver")) {

Review comment:
       Is the `"tserver"` string correct?

##########
File path: minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloConfigImpl.java
##########
@@ -56,6 +56,7 @@
   private Map<String,String> configuredSiteConig = new HashMap<>();
   private Map<String,String> clientProps = new HashMap<>();
   private int numTservers = 2;
+  private int numScanServers = 2;

Review comment:
       This will make Mini Accumulo all of a sudden use more memory for an optional feature.  Wondering if this should default to zero.

##########
File path: test/src/main/resources/log4j2-test.properties
##########
@@ -79,7 +79,7 @@ logger.16.name = org.apache.accumulo.server.util.ReplicationTableUtil
 logger.16.level = trace
 
 logger.17.name = org.apache.accumulo.core.clientImpl.ThriftScanner
-logger.17.level = info
+logger.17.level = debug

Review comment:
       Do we want to keep this change?

##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,1288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends TabletServer implements TabletClientService.Iface {
+
+  static class ScanInformation extends MutableTriple<Long,KeyExtent,Tablet> {
+    private static final long serialVersionUID = 1L;
+
+    public Long getScanId() {
+      return getLeft();
+    }
+
+    public void setScanId(Long scanId) {
+      setLeft(scanId);
+    }
+
+    public KeyExtent getExtent() {
+      return getMiddle();
+    }
+
+    public void setExtent(KeyExtent extent) {
+      setMiddle(extent);
+    }
+
+    public Tablet getTablet() {
+      return getRight();
+    }
+
+    public void setTablet(Tablet tablet) {
+      setRight(tablet);
+    }
+  }
+
+  /**
+   * A compaction manager that does nothing
+   */
+  private static class ScanServerCompactionManager extends CompactionManager {
+
+    public ScanServerCompactionManager(ServerContext context,
+        CompactionExecutorsMetrics ceMetrics) {
+      super(new ArrayList<>(), context, ceMetrics);
+    }
+
+    @Override
+    public void compactableChanged(Compactable compactable) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public CompactionServices getServices() {
+      return null;
+    }
+
+    @Override
+    public boolean isCompactionQueued(KeyExtent extent, Set<CompactionServiceId> servicesUsed) {
+      return false;
+    }
+
+    @Override
+    public int getCompactionsRunning() {
+      return 0;
+    }
+
+    @Override
+    public int getCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
+        String compactorId, ExternalCompactionId externalCompactionId) {
+      return null;
+    }
+
+    @Override
+    public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent extent,
+        CompactionExecutorId ceid) {}
+
+    @Override
+    public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+        KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize,
+        long entries) {}
+
+    @Override
+    public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted,
+        Map<KeyExtent,Tablet> currentTablets) {}
+
+    @Override
+    public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+      return null;
+    }
+
+    @Override
+    public Collection<ExtCompMetric> getExternalMetrics() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> servicesUsed,
+        Set<ExternalCompactionId> ecids) {}
+
+  }
+
+  public static class ScanServerCompactionExecutorMetrics extends CompactionExecutorsMetrics {
+
+    @Override
+    protected void startUpdateThread() {}
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftClientHandler handler;
+  private UUID serverLockUUID;
+  private final TabletMetadataLoader tabletMetadataLoader;
+  private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  protected Set<StoredTabletFile> lockedFiles = new HashSet<>();
+  protected Map<StoredTabletFile,ReservedFile> reservedFiles = new ConcurrentHashMap<>();
+  protected AtomicLong nextScanReservationId = new AtomicLong();
+
+  private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<? extends KeyExtent,? extends TabletMetadata>
+        loadAll(Set<? extends KeyExtent> keys) {
+      long t1 = System.currentTimeMillis();
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys).build().stream()
+          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+      return tms;
+    }
+  }
+
+  public ScanServer(ServerOpts opts, String[] args) {
+    super(opts, args, true);
+
+    // Note: The way to control the number of concurrent scans that a ScanServer will
+    // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or the number
+    // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+    long cacheExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+    long scanServerReservationExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+    tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+    if (cacheExpiration == 0L) {
+      LOG.warn("Tablet metadata caching disabled, may cause excessive scans on metadata table.");
+      tabletMetadataCache = null;
+    } else {
+      if (cacheExpiration < 60000) {
+        LOG.warn(
+            "Tablet metadata caching less than one minute, may cause excessive scans on metadata table.");
+      }
+      tabletMetadataCache =
+          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)

Review comment:
       Curious why choose Caffeine over Guava cache?

##########
File path: test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
##########
@@ -149,6 +149,7 @@ public void testAccumuloClientBuilder() throws Exception {
     props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers);
     props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), user1);
     props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "22s");
+    props.put(ClientProperty.SCAN_SERVER_DISPATCHER_OPTS_PREFIX + "enabled", "true");

Review comment:
       What is the purpose of this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion edited a comment on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion edited a comment on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1021503545


   @keith-turner and I had a discussion about changing the current design. The key points from the discussion are:
   
   1. Modifying the load balancing logic from the current implementation (which uses ZK to know which scan servers are available) to an implemenation that selects  candidate scan servers to communicate with for scanning an extent. This new implementation would not require ZK and would use a hashing algorithm so that all clients select the same set of scan servers for an extent in an attempt to make use of the index and data block caches.
   
   2. We may want to have configuration settings for the scan server for things that are inherited from the tablet server. For example, the data block cache size. Does the scan server use the current configuration property name, or should it have its own.
   
   3. ~~Modifying the scan server to perform more than one concurrent scan.~~ Added in [0d553de]
   
   4. Modify the client configuration code such that the name is not tied to the implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] ctubbsii commented on pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
ctubbsii commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1017869501


   > @dlmarion curios if this feature is something you would like to see in 2.1.0?
   
   Please no! I'd really like to get 2.1.0 released this century, and I think trying to include something this substantial would set us back too far for testing/code review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Initial implementation of my vision for implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r791032803



##########
File path: core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java
##########
@@ -497,26 +499,44 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
     for (final String tsLocation : locations) {
 
       final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
-      if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
-        QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns);
-        queryTasks.add(queryTask);
+      if (options.isUseScanServer()) {
+        // Ignore the tablets location and find a scan server to use
+        ScanServerLocator ssl = context.getScanServerLocator();
+        tabletsRanges.forEach((k, v) -> {
+          try {
+            String location = ssl.reserveScanServer(new TabletIdImpl(k));

Review comment:
       >  I figured that if a ScanServer had many threads and performed more than one scan at a time, then we would potentially run into the same situation we have in in the TabletServer w/r/t memory usage.
   
   I can see the benefit of a single thread.  We could start off implementing the thread pool with syncQ with a hard coded thread of size of one and get this really fast busy exception behavior.  Later on, as we gain experience, we could refine the scan server config and make what causes busy exceptions on a scan server configurable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1024522042


   Some annotated shell output:
   ```bash
   root@test> createtable test (1)
   root@test test> insert a b c d (2)
   root@test test> scan (3)
   a b:c []	d
   root@test test> scan -cl immediate (4)
   a b:c []	d
   root@test test> scan -cl eventual (5)
   root@test test> flush (6)
   2022-01-28T18:58:10,693 [shell.Shell] INFO : Flush of table test  initiated...
   root@test test> scan (7)
   a b:c []	d
   root@test test> scan -cl eventual (8)
   a b:c []	d
   ```
   
   I create a table (1) and insert some date (2). When I run a scan (3,4) with the `immediate` consistency level, which happens to be the default, the scan uses normal code path and issues the scan command against the tablet server. Data is returned because the tablet server code path also returns data that is in the in-memory map. When I scan with the `eventual` consistency level no data is returned because the code path in the scanner only uses the data in the tablet's files. When I flush (6) the data to write a file in HDFS, the subsequent scans with `immediate` and `eventual` consistency level return the data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#issuecomment-1021503545


   @keith-turner and I had a discussion about changing the current design. The key points from the discussion are:
   
   1. Modifying the load balancing logic from the current implementation (which uses ZK to know which scan servers are available) to an implemenation that selects  candidate scan servers to communicate with for scanning an extent. This new implementation would not require ZK and would use a hashing algorithm so that all clients select the same set of scan servers for an extent in an attempt to make use of the index and data block caches.
   
   2. We may want to have configuration settings for the scan server for things that are inherited from the tablet server. For example, the data block cache size. Does the scan server use the current configuration property name, or should it have its own.
   
   3. Modifying the scan server to perform more than one concurrent scan.
   
   4. Modify the client configuration code such that the name is not tied to the implementation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] keith-turner commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
keith-turner commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r838049934



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/scan/ScanTask.java
##########
@@ -127,6 +161,9 @@ public T get(long timeout, TimeUnit unit)
     // returned
     resultQueue = null;
 
+    // TODO by not wrapping the error stack information that could be important for debugging is
+    // lost, the error is from a background thread but the stack trace from this foreground thread
+    // is lost.

Review comment:
       So then maybe we can remove this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [accumulo] dlmarion commented on a change in pull request #2422: Implementation of Scan Servers.

Posted by GitBox <gi...@apache.org>.
dlmarion commented on a change in pull request #2422:
URL: https://github.com/apache/accumulo/pull/2422#discussion_r840531099



##########
File path: server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
##########
@@ -0,0 +1,1288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.tserver;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.clientImpl.thrift.ConfigurationType;
+import org.apache.accumulo.core.clientImpl.thrift.TDiskUsage;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
+import org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
+import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
+import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MapFileInfo;
+import org.apache.accumulo.core.dataImpl.thrift.MultiScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.ScanResult;
+import org.apache.accumulo.core.dataImpl.thrift.TCMResult;
+import org.apache.accumulo.core.dataImpl.thrift.TColumn;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TConditionalSession;
+import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
+import org.apache.accumulo.core.dataImpl.thrift.TMutation;
+import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
+import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
+import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheConfiguration;
+import org.apache.accumulo.core.master.thrift.TabletServerStatus;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metrics.MetricsUtil;
+import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
+import org.apache.accumulo.core.spi.compaction.CompactionExecutorId;
+import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
+import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveCompaction;
+import org.apache.accumulo.core.tabletserver.thrift.ActiveScan;
+import org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
+import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException;
+import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException;
+import org.apache.accumulo.core.tabletserver.thrift.TCompactionQueueSummary;
+import org.apache.accumulo.core.tabletserver.thrift.TDurability;
+import org.apache.accumulo.core.tabletserver.thrift.TExternalCompactionJob;
+import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException;
+import org.apache.accumulo.core.tabletserver.thrift.TSamplerConfiguration;
+import org.apache.accumulo.core.tabletserver.thrift.TUnloadTabletGoal;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
+import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Iface;
+import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
+import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException;
+import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.trace.thrift.TInfo;
+import org.apache.accumulo.core.util.Halt;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.fate.zookeeper.ServiceLock;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockLossReason;
+import org.apache.accumulo.fate.zookeeper.ServiceLock.LockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.server.ServerOpts;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.rpc.ServerAddress;
+import org.apache.accumulo.server.rpc.TCredentialsUpdatingWrapper;
+import org.apache.accumulo.server.rpc.TServerUtils;
+import org.apache.accumulo.server.rpc.ThriftServerType;
+import org.apache.accumulo.server.security.SecurityUtil;
+import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
+import org.apache.accumulo.tserver.compactions.Compactable;
+import org.apache.accumulo.tserver.compactions.CompactionManager;
+import org.apache.accumulo.tserver.compactions.ExternalCompactionJob;
+import org.apache.accumulo.tserver.metrics.CompactionExecutorsMetrics;
+import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
+import org.apache.accumulo.tserver.session.MultiScanSession;
+import org.apache.accumulo.tserver.session.ScanSession;
+import org.apache.accumulo.tserver.session.ScanSession.TabletResolver;
+import org.apache.accumulo.tserver.session.SingleScanSession;
+import org.apache.accumulo.tserver.tablet.Tablet;
+import org.apache.accumulo.tserver.tablet.TabletData;
+import org.apache.commons.lang3.tuple.MutableTriple;
+import org.apache.thrift.TException;
+import org.apache.zookeeper.KeeperException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Scheduler;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+
+public class ScanServer extends TabletServer implements TabletClientService.Iface {
+
+  static class ScanInformation extends MutableTriple<Long,KeyExtent,Tablet> {
+    private static final long serialVersionUID = 1L;
+
+    public Long getScanId() {
+      return getLeft();
+    }
+
+    public void setScanId(Long scanId) {
+      setLeft(scanId);
+    }
+
+    public KeyExtent getExtent() {
+      return getMiddle();
+    }
+
+    public void setExtent(KeyExtent extent) {
+      setMiddle(extent);
+    }
+
+    public Tablet getTablet() {
+      return getRight();
+    }
+
+    public void setTablet(Tablet tablet) {
+      setRight(tablet);
+    }
+  }
+
+  /**
+   * A compaction manager that does nothing
+   */
+  private static class ScanServerCompactionManager extends CompactionManager {
+
+    public ScanServerCompactionManager(ServerContext context,
+        CompactionExecutorsMetrics ceMetrics) {
+      super(new ArrayList<>(), context, ceMetrics);
+    }
+
+    @Override
+    public void compactableChanged(Compactable compactable) {}
+
+    @Override
+    public void start() {}
+
+    @Override
+    public CompactionServices getServices() {
+      return null;
+    }
+
+    @Override
+    public boolean isCompactionQueued(KeyExtent extent, Set<CompactionServiceId> servicesUsed) {
+      return false;
+    }
+
+    @Override
+    public int getCompactionsRunning() {
+      return 0;
+    }
+
+    @Override
+    public int getCompactionsQueued() {
+      return 0;
+    }
+
+    @Override
+    public ExternalCompactionJob reserveExternalCompaction(String queueName, long priority,
+        String compactorId, ExternalCompactionId externalCompactionId) {
+      return null;
+    }
+
+    @Override
+    public void registerExternalCompaction(ExternalCompactionId ecid, KeyExtent extent,
+        CompactionExecutorId ceid) {}
+
+    @Override
+    public void commitExternalCompaction(ExternalCompactionId extCompactionId,
+        KeyExtent extentCompacted, Map<KeyExtent,Tablet> currentTablets, long fileSize,
+        long entries) {}
+
+    @Override
+    public void externalCompactionFailed(ExternalCompactionId ecid, KeyExtent extentCompacted,
+        Map<KeyExtent,Tablet> currentTablets) {}
+
+    @Override
+    public List<TCompactionQueueSummary> getCompactionQueueSummaries() {
+      return null;
+    }
+
+    @Override
+    public Collection<ExtCompMetric> getExternalMetrics() {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public void compactableClosed(KeyExtent extent, Set<CompactionServiceId> servicesUsed,
+        Set<ExternalCompactionId> ecids) {}
+
+  }
+
+  public static class ScanServerCompactionExecutorMetrics extends CompactionExecutorsMetrics {
+
+    @Override
+    protected void startUpdateThread() {}
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
+
+  protected ThriftClientHandler handler;
+  private UUID serverLockUUID;
+  private final TabletMetadataLoader tabletMetadataLoader;
+  private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+  protected Set<StoredTabletFile> lockedFiles = new HashSet<>();
+  protected Map<StoredTabletFile,ReservedFile> reservedFiles = new ConcurrentHashMap<>();
+  protected AtomicLong nextScanReservationId = new AtomicLong();
+
+  private static class TabletMetadataLoader implements CacheLoader<KeyExtent,TabletMetadata> {
+
+    private final Ample ample;
+
+    private TabletMetadataLoader(Ample ample) {
+      this.ample = ample;
+    }
+
+    @Override
+    public @Nullable TabletMetadata load(KeyExtent keyExtent) {
+      long t1 = System.currentTimeMillis();
+      var tm = ample.readTablet(keyExtent);
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for 1 tablet in {} ms", t2 - t1);
+      return tm;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Map<? extends KeyExtent,? extends TabletMetadata>
+        loadAll(Set<? extends KeyExtent> keys) {
+      long t1 = System.currentTimeMillis();
+      var tms = ample.readTablets().forTablets((Collection<KeyExtent>) keys).build().stream()
+          .collect(Collectors.toMap(tm -> tm.getExtent(), tm -> tm));
+      long t2 = System.currentTimeMillis();
+      LOG.trace("Read metadata for {} tablets in {} ms", keys.size(), t2 - t1);
+      return tms;
+    }
+  }
+
+  public ScanServer(ServerOpts opts, String[] args) {
+    super(opts, args, true);
+
+    // Note: The way to control the number of concurrent scans that a ScanServer will
+    // perform is by using Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS or the number
+    // of threads in Property.SSERV_SCAN_EXECUTORS_PREFIX.
+
+    long cacheExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION);
+
+    long scanServerReservationExpiration =
+        getConfiguration().getTimeInMillis(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME);
+
+    tabletMetadataLoader = new TabletMetadataLoader(getContext().getAmple());
+
+    if (cacheExpiration == 0L) {
+      LOG.warn("Tablet metadata caching disabled, may cause excessive scans on metadata table.");
+      tabletMetadataCache = null;
+    } else {
+      if (cacheExpiration < 60000) {
+        LOG.warn(
+            "Tablet metadata caching less than one minute, may cause excessive scans on metadata table.");
+      }
+      tabletMetadataCache =
+          Caffeine.newBuilder().expireAfterWrite(cacheExpiration, TimeUnit.MILLISECONDS)

Review comment:
       I went with something I was familiar with.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org