You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/05/24 18:34:30 UTC

[GitHub] [accumulo] ctubbsii commented on a diff in pull request #2665: Eventually Consistent scans / ScanServer feature

ctubbsii commented on code in PR #2665:
URL: https://github.com/apache/accumulo/pull/2665#discussion_r880783668


##########
test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java:
##########
@@ -131,12 +131,12 @@ public void closeMultiScan(TInfo tinfo, long scanID) {}
     public void closeScan(TInfo tinfo, long scanID) {}
 
     @Override
-    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID) {
+    public MultiScanResult continueMultiScan(TInfo tinfo, long scanID, long busyTimeout) {

Review Comment:
   Could this addition of a `busyTimeout` parameter be done in a separate distinct smaller change first?



##########
test/src/main/java/org/apache/accumulo/test/ScanServerMultipleScansIT.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.hadoop.io.Text;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Iterables;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class ScanServerMultipleScansIT extends SharedMiniClusterBase {
+
+  private static class ScanServerITConfiguration implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+    }
+  }
+
+  private static final int NUM_SCANS = 4;
+
+  @BeforeAll
+  public static void start() throws Exception {
+    ScanServerITConfiguration c = new ScanServerITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  @Test
+  public void testMutipleScansSameTablet() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      ReadWriteIT.ingest(client, 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      var executor = Executors.newCachedThreadPool();
+
+      List<Future<?>> futures = new ArrayList<>(NUM_SCANS);
+      for (int i = 0; i < NUM_SCANS; i++) {
+        var future = executor.submit(() -> {
+          try {
+            latch.await();
+          } catch (InterruptedException e1) {
+            fail("InterruptedException waiting for latch");
+          }
+          try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+            scanner.setRange(new Range());
+            scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+            assertEquals(100, Iterables.size(scanner));
+          } catch (TableNotFoundException e) {
+            fail("Table not found");
+          }
+        });
+
+        futures.add(future);
+      }
+      latch.countDown();
+      for (Future<?> future : futures) {
+        future.get();
+      }
+
+      executor.shutdown();
+    }
+  }
+
+  @Test
+  public void testSingleScanDifferentTablets() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+      SortedSet<Text> splitPoints = new TreeSet<>();
+      splitPoints.add(new Text("row_0000000002\\0"));
+      splitPoints.add(new Text("row_0000000005\\0"));
+      splitPoints.add(new Text("row_0000000008\\0"));
+      client.tableOperations().addSplits(tableName, splitPoints);
+
+      @SuppressWarnings("deprecation")
+      Collection<Text> splits = client.tableOperations().getSplits(tableName);
+      assertEquals(3, splits.size());
+
+      ReadWriteIT.ingest(client, 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(100, Iterables.size(scanner));
+      }
+    }
+  }
+
+  @Test
+  public void testMultipleScansDifferentTablets() throws Exception {
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+      SortedSet<Text> splitPoints = new TreeSet<>();
+      splitPoints.add(new Text("row_0000000002\\0"));
+      splitPoints.add(new Text("row_0000000005\\0"));
+      splitPoints.add(new Text("row_0000000008\\0"));
+      client.tableOperations().addSplits(tableName, splitPoints);
+
+      @SuppressWarnings("deprecation")
+      Collection<Text> splits = client.tableOperations().getSplits(tableName);
+      assertEquals(3, splits.size());
+      System.out.println(splits);
+
+      ReadWriteIT.ingest(client, 10, 10, 50, 0, tableName);
+
+      client.tableOperations().flush(tableName, null, null, true);
+
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      final AtomicInteger counter = new AtomicInteger(0);
+
+      var executor = Executors.newCachedThreadPool();

Review Comment:
   Creating this threadpool could be done in a `@BeforeEach` method, and shutting it down could be done in an `@AfterEach`. If there's any other common setup/teardown code, moving it into a before/after method will make the test smaller and easier to understand what is being tested.



##########
test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.Reference;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.gc.GCRun;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase {
+
+  private static class ScanServerMetadataEntriesITConfiguration
+      implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+      cfg.setProperty(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME, "5s");
+    }
+  }
+
+  @BeforeAll
+  public static void start() throws Exception {
+    ScanServerMetadataEntriesITConfiguration c = new ScanServerMetadataEntriesITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    stopMiniCluster();
+  }
+
+  @Test
+  public void testServerContextMethods() throws Exception {
+
+    try (AccumuloClient ac = Accumulo.newClient().from(getClientProps()).build()) {
+      HostAndPort server = HostAndPort.fromParts("127.0.0.1", 1234);
+      UUID serverLockUUID = UUID.randomUUID();
+
+      String[] files =
+          new String[] {"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf",
+              "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"};
+
+      Set<ScanServerRefTabletFile> scanRefs = new HashSet<>();
+      for (String file : files) {
+        scanRefs.add(new ScanServerRefTabletFile(file, server.toString(), serverLockUUID));
+      }
+
+      ServerContext ctx = getCluster().getServerContext();
+
+      ctx.getAmple().putScanServerFileReferences(scanRefs);
+      assertEquals(2, ctx.getAmple().getScanServerFileReferences().count());
+
+      Set<ScanServerRefTabletFile> scanRefs2 =
+          ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet());
+
+      assertEquals(scanRefs, scanRefs2);
+
+      ctx.getAmple().deleteScanServerFileReferences("127.0.0.1:1234", serverLockUUID);
+      assertEquals(0, ctx.getAmple().getScanServerFileReferences().count());
+
+      ctx.getAmple().putScanServerFileReferences(scanRefs);
+      assertEquals(2, ctx.getAmple().getScanServerFileReferences().count());
+
+      ctx.getAmple().deleteScanServerFileReferences(scanRefs);
+      assertEquals(0, ctx.getAmple().getScanServerFileReferences().count());
+
+    }
+  }
+
+  @Test
+  public void testScanServerMetadataEntries() throws Exception {
+
+    ServerContext ctx = getCluster().getServerContext();
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);

Review Comment:
   ServerContext is a ClientContext is an AccumuloClient. So, constructing a separate client is a bit unnecessary.
   
   ```suggestion
       ServerContext client = getCluster().getServerContext();
       String tableName = getUniqueNames(1)[0];
   
       client.tableOperations().create(tableName);
   ```



##########
test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.metadata.Reference;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.gc.GCRun;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.LoggerFactory;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class ScanServerMetadataEntriesIT extends SharedMiniClusterBase {
+
+  private static class ScanServerMetadataEntriesITConfiguration
+      implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+      cfg.setProperty(Property.SSERVER_SCAN_REFERENCE_EXPIRATION_TIME, "5s");
+    }
+  }
+
+  @BeforeAll
+  public static void start() throws Exception {
+    ScanServerMetadataEntriesITConfiguration c = new ScanServerMetadataEntriesITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
+        "localhost");
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    stopMiniCluster();
+  }
+
+  @Test
+  public void testServerContextMethods() throws Exception {
+
+    try (AccumuloClient ac = Accumulo.newClient().from(getClientProps()).build()) {
+      HostAndPort server = HostAndPort.fromParts("127.0.0.1", 1234);
+      UUID serverLockUUID = UUID.randomUUID();
+
+      String[] files =
+          new String[] {"hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000070.rf",
+              "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/F0000071.rf"};
+
+      Set<ScanServerRefTabletFile> scanRefs = new HashSet<>();
+      for (String file : files) {
+        scanRefs.add(new ScanServerRefTabletFile(file, server.toString(), serverLockUUID));
+      }

Review Comment:
   This code would be much smaller if you used streams:
   ```java
     Set<ScanServerRefTabletFile> scanRefs = Stream.of("F0000070.rf", "F0000071.rf")
         .map(f -> "hdfs://localhost:8020/accumulo/tables/2a/default_tablet/" + f)
         .map(f -> new ScanServerRefTabletFile(f, server.toString(), serverLockUUID))
         .collect(Collectors.toSet());
   ```
   



##########
shell/src/main/java/org/apache/accumulo/shell/commands/ScanCommand.java:
##########
@@ -419,6 +438,7 @@ public Options getOptions() {
     outputFileOpt.setArgName("file");
     contextOpt.setArgName("context");
     executionHintsOpt.setArgName("<key>=<value>{,<key>=<value>}");
+    scanServerOpt.setArgName("immediate|eventual");

Review Comment:
   I'm not a fan of using "immediate" and "eventual" as the user-facing control knob. Those characteristics are [*necessary* conditions of using one server type vs. another, but they are not *sufficient* conditions to communicate which resources you're relying on to execute the scan](https://en.wikipedia.org/wiki/Necessity_and_sufficiency).
   
   In general, I think API terminology should be *sufficient* to communicate to the interfacing user/component what they are interfacing with.
   
   So, I think it would be better to explicitly ask for a scan server, and include in the docs that scan servers have an eventual consistency.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+
+public class ScanAttemptsImpl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanAttemptsImpl.class);
+
+  static class ScanAttemptImpl
+      implements org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt {

Review Comment:
   This class is imported, yet it's still using a fully qualified class name here.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java:
##########
@@ -453,26 +482,40 @@ public void run() {
   private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
       final ResultReceiver receiver, List<Column> columns) {
 
-    if (timedoutServers.containsAll(binnedRanges.keySet())) {
-      // all servers have timed out
-      throw new TimedOutException(timedoutServers);
-    }
-
-    // when there are lots of threads and a few tablet servers
-    // it is good to break request to tablet servers up, the
-    // following code determines if this is the case
     int maxTabletsPerRequest = Integer.MAX_VALUE;
-    if (numThreads / binnedRanges.size() > 1) {
-      int totalNumberOfTablets = 0;
-      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
-        totalNumberOfTablets += entry.getValue().size();
-      }
 
-      maxTabletsPerRequest = totalNumberOfTablets / numThreads;
-      if (maxTabletsPerRequest == 0) {
-        maxTabletsPerRequest = 1;
+    long busyTimeout = 0;
+    Duration scanServerDispatcherDelay = null;
+    Map<String,ScanAttemptsImpl.ScanAttemptReporter> reporters = Map.of();
+
+    if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {

Review Comment:
   It's weird to ask for eventual consistency. Nobody *wants* eventual consistency. Eventual consistency is always tolerated, never desired.



##########
core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java:
##########
@@ -373,6 +384,23 @@ default void forEach(BiConsumer<? super Key,? super Value> keyValueConsumer) {
     }
   }
 
+  /**
+   * Get the configured consistency level
+   *
+   * @return consistency level
+   * @since 2.1.0
+   */
+  public ConsistencyLevel getConsistencyLevel();
+
+  /**
+   * Set the desired consistency level for this scanner.
+   *
+   * @param level
+   *          consistency level
+   * @since 2.1.0
+   */
+  public void setConsistencyLevel(ConsistencyLevel level);
+

Review Comment:
   See my previous comment about *necessary* and *sufficient* API terminology. I think interfacing with a "consistency level" is insufficient terminology for the API.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -332,6 +340,75 @@ public synchronized BatchWriterConfig getBatchWriterConfig() {
     return batchWriterConfig;
   }
 
+  /**
+   * @return map of live scan server addresses to lock uuids.
+   */
+  public Map<String,UUID> getScanServers() {
+    Map<String,UUID> liveScanServers = new HashMap<>();
+    String root = this.getZooKeeperRoot() + Constants.ZSSERVERS;
+    var addrs = this.getZooCache().getChildren(root);
+    for (String addr : addrs) {
+      try {
+        final var zLockPath = ServiceLock.path(root + "/" + addr);
+        ZcStat stat = new ZcStat();
+        byte[] lockData = ServiceLock.getLockData(getZooCache(), zLockPath, stat);
+        if (lockData != null) {
+          UUID uuid = UUID.fromString(new String(lockData, UTF_8));
+          liveScanServers.put(addr, uuid);
+        }
+      } catch (Exception e) {
+        log.error("Error validating zookeeper scan server node: " + addr, e);
+      }
+    }
+    return liveScanServers;
+  }
+
+  /**
+   * @return the scan server dispatcher implementation used for determining which scan servers will
+   *         be used when performing an eventually consistent scan
+   */
+  public synchronized ScanServerDispatcher getScanServerDispatcher() {
+    ensureOpen();
+    if (scanServerDispatcher == null) {
+      String clazz = ClientProperty.SCAN_SERVER_DISPATCHER.getValue(info.getProperties());

Review Comment:
   You should avoid synchronization and lazily instantiate this by following the memoized Supplier mechanism that was done elsewhere in this class for similar objects.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImpl.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.clientImpl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.accumulo.core.data.TabletId;
+import org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Maps;
+
+public class ScanAttemptsImpl {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ScanAttemptsImpl.class);
+
+  static class ScanAttemptImpl
+      implements org.apache.accumulo.core.spi.scan.ScanServerDispatcher.ScanAttempt {
+
+    private final String server;
+    private final long time;
+    private final Result result;
+    private volatile long mutationCount = Long.MAX_VALUE;
+
+    ScanAttemptImpl(Result result, String server, long time) {
+      this.result = result;
+      this.server = Objects.requireNonNull(server);
+      this.time = time;
+    }
+
+    @Override
+    public String getServer() {
+      return server;
+    }
+
+    @Override
+    public long getEndTime() {
+      return time;
+    }
+
+    @Override
+    public Result getResult() {
+      return result;
+    }
+
+    private void setMutationCount(long mc) {
+      this.mutationCount = mc;
+    }
+
+    public long getMutationCount() {
+      return mutationCount;
+    }
+  }
+
+  private Map<TabletId,Collection<ScanAttemptImpl>> attempts = new ConcurrentHashMap<>();
+  private long mutationCounter = 0;
+
+  private void add(TabletId tablet, ScanAttempt.Result result, String server, long endTime) {
+
+    ScanAttemptImpl sa = new ScanAttemptImpl(result, server, endTime);
+
+    attempts.computeIfAbsent(tablet, k -> ConcurrentHashMap.newKeySet()).add(sa);
+
+    synchronized (this) {
+      // now that the scan attempt obj is added to all concurrent data structs, make it visible
+      // need to atomically increment the counter AND set the counter on the object
+      sa.setMutationCount(mutationCounter++);
+    }
+
+  }
+
+  public static interface ScanAttemptReporter {
+    void report(ScanAttempt.Result result);
+  }
+
+  ScanAttemptReporter createReporter(String server, TabletId tablet) {
+    return new ScanAttemptReporter() {
+      @Override
+      public void report(ScanAttempt.Result result) {
+        LOG.trace("Received result: {}", result);
+        add(tablet, result, server, System.currentTimeMillis());
+      }
+    };
+  }
+
+  Map<TabletId,Collection<ScanAttemptImpl>> snapshot() {
+    // allows only seeing scan attempt objs that were added before this call
+
+    long snapMC;
+    synchronized (ScanAttemptsImpl.this) {
+      snapMC = mutationCounter;
+    }
+    var tmp = Maps.transformValues(attempts, tabletAttemptList -> Collections2
+        .filter(tabletAttemptList, sai -> sai.getMutationCount() < snapMC));
+
+    return Maps.filterEntries(tmp, entry -> !entry.getValue().isEmpty());

Review Comment:
   This appears to be using Guava-specific Functional APIs. It's better to use Java built-in Functional APIs/Streams instead.



##########
test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
+import org.apache.accumulo.core.conf.ClientProperty;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.zookeeper.KeeperException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import com.google.common.collect.Iterables;
+
+@Tag(MINI_CLUSTER_ONLY)
+public class ScanServerConcurrentTabletScanIT extends SharedMiniClusterBase {
+
+  private static class ScanServerConcurrentTabletScanITConfiguration
+      implements MiniClusterConfigurationCallback {
+
+    @Override
+    public void configureMiniCluster(MiniAccumuloConfigImpl cfg,
+        org.apache.hadoop.conf.Configuration coreSite) {
+      cfg.setNumScanServers(1);
+      cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s");
+      cfg.setProperty(Property.SSERV_MINTHREADS, "4");
+    }
+  }
+
+  @BeforeAll
+  public static void start() throws Exception {
+    ScanServerConcurrentTabletScanITConfiguration c =
+        new ScanServerConcurrentTabletScanITConfiguration();
+    SharedMiniClusterBase.startMiniClusterWithConfig(c);
+  }
+
+  @AfterAll
+  public static void stop() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+  }
+
+  private void startScanServer(boolean cacheEnabled)
+      throws IOException, KeeperException, InterruptedException {
+
+    String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
+    ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter();
+    String scanServerRoot = zooRoot + Constants.ZSSERVERS;
+
+    SharedMiniClusterBase.getCluster().getClusterControl().stop(ServerType.SCAN_SERVER);
+
+    Map<String,String> overrides = new HashMap<>();
+    overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(),
+        cacheEnabled ? "300m" : "0m");
+    SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, overrides,
+        1);
+    while (zrw.getChildren(scanServerRoot).size() == 0) {
+      Thread.sleep(500);
+    }
+
+  }
+
+  @Test
+  public void testScanSameTabletDifferentDataTabletMetadataCacheEnabled() throws Exception {
+
+    startScanServer(true);
+
+    Properties clientProperties = getClientProps();
+    clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");
+
+    try (AccumuloClient client = Accumulo.newClient().from(clientProperties).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      // Load 1000 k/v
+      ReadWriteIT.ingest(client, 10, 100, 50, 0, "COLA", tableName);
+      client.tableOperations().flush(tableName, null, null, true);
+
+      Scanner scanner1 = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner1.setRange(new Range());
+      scanner1.setBatchSize(100);
+      scanner1.setReadaheadThreshold(0);
+      scanner1.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+
+      // iter1 should read 1000 k/v
+      Iterator<Entry<Key,Value>> iter1 = scanner1.iterator();
+
+      // Partially read the data and then start a 2nd scan
+      int count1 = 0;
+      while (iter1.hasNext() && count1 < 10) {
+        iter1.next();
+        count1++;
+      }
+
+      // Load another 100 k/v
+      ReadWriteIT.ingest(client, 10, 10, 50, 0, "COLB", tableName);
+      client.tableOperations().flush(tableName, null, null, true);
+
+      // iter2 should read 1000 k/v because the tablet metadata is cached.
+      Iterator<Entry<Key,Value>> iter2 = scanner1.iterator();
+
+      while (iter1.hasNext()) {
+        iter1.next();
+        count1++;
+      }
+      assertEquals(1000, count1);
+
+      int count2 = 0;
+      while (iter2.hasNext()) {
+        iter2.next();
+        count2++;
+      }
+      assertEquals(1000, count2);
+
+      scanner1.close();
+
+      try (Scanner scanner2 = client.createScanner(tableName, Authorizations.EMPTY)) {
+        assertEquals(1100, Iterables.size(scanner2));
+      }
+    }
+  }
+
+  @Test
+  public void testScanSameTabletDifferentDataTabletMetadataCacheDisabled() throws Exception {
+
+    startScanServer(false);
+
+    Properties clientProperties = getClientProps();
+    clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");
+
+    try (AccumuloClient client = Accumulo.newClient().from(clientProperties).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      client.tableOperations().create(tableName);
+
+      // Load 1000 k/v
+      ReadWriteIT.ingest(client, 10, 100, 50, 0, "COLA", tableName);
+      client.tableOperations().flush(tableName, null, null, true);
+
+      Scanner scanner1 = client.createScanner(tableName, Authorizations.EMPTY);
+      scanner1.setRange(new Range());
+      scanner1.setBatchSize(100);
+      scanner1.setReadaheadThreshold(0);
+      scanner1.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+
+      // iter1 should read 1000 k/v
+      Iterator<Entry<Key,Value>> iter1 = scanner1.iterator();
+
+      // Partially read the data and then start a 2nd scan
+      int count1 = 0;
+      while (iter1.hasNext() && count1 < 10) {
+        iter1.next();
+        count1++;
+      }
+
+      // Load another 100 k/v
+      ReadWriteIT.ingest(client, 10, 10, 50, 0, "COLB", tableName);
+      client.tableOperations().flush(tableName, null, null, true);
+
+      // iter2 should read 1100 k/v because the tablet metadata is not cached.
+      Iterator<Entry<Key,Value>> iter2 = scanner1.iterator();
+
+      while (iter1.hasNext()) {
+        iter1.next();
+        count1++;
+      }
+      assertEquals(1000, count1);

Review Comment:
   Pretty sure you could just do the following one-liner instead of dealing with looping over an iterator.
   
   ```java
   assertEquals(1000, scanner1.stream().count());
   ```



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java:
##########
@@ -612,6 +741,7 @@ void errorOccured() {
     }
 
     /**
+     *
      */

Review Comment:
   :+1:  :smiley_cat: 
   
   This was probably an editor thing. But, this empty javadoc can be deleted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org