You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2022/11/24 10:57:54 UTC

[GitHub] [accumulo] keith-turner commented on a diff in pull request #3091: Added code and tests to confirm ScanServers can scan offline tables

keith-turner commented on code in PR #3091:
URL: https://github.com/apache/accumulo/pull/3091#discussion_r1031344021


##########
test/src/main/java/org/apache/accumulo/test/ScanServerIT.java:
##########
@@ -148,16 +212,56 @@ public void testBatchScan() throws Exception {
   }
 
   @Test
-  public void testScanOfflineTable() throws Exception {
+  public void testBatchScanTableCreatedOffline() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+      client.tableOperations().create(tableName, new NewTableConfiguration().createOffline());
+
+      try (BatchScanner scanner = client.createBatchScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRanges(Collections.singletonList(new Range()));
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      } // when the scanner is closed, all open sessions should be closed

Review Comment:
   ```suggestion
           assertEquals(0, Iterables.size(scanner), "Unexpected entries seen when scanning empty table");
         } // when the scanner is closed, all open sessions should be closed
         assertFalse(client.tableOperations().isOnline(tableName));
   ```



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java:
##########
@@ -241,38 +247,59 @@ private void binRanges(TabletLocator tabletLocator, List<Range> ranges,
 
     int lastFailureSize = Integer.MAX_VALUE;
 
-    while (true) {
+    if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL) && !this.tableOnline) {
 
+      // The TabletLocator only caches the location of tablets for online tables. The ScanServer
+      // can scan online and offline tables, but does not require the location of the tablet. It
+      // only requires the KeyExtent. Find the first extent for the table that matches the
+      // startRow provided in the scanState. If no startRow is provided, use the first KeyExtent
+      // for the table
       binnedRanges.clear();
-      List<Range> failures = tabletLocator.binRanges(context, ranges, binnedRanges);
-
-      if (failures.isEmpty()) {
-        break;
-      } else {
-        // tried to only do table state checks when failures.size() == ranges.size(), however this
-        // did
-        // not work because nothing ever invalidated entries in the tabletLocator cache... so even
-        // though
-        // the table was deleted the tablet locator entries for the deleted table were not
-        // cleared... so
-        // need to always do the check when failures occur
-        if (failures.size() >= lastFailureSize) {
-          context.requireNotDeleted(tableId);
-          context.requireNotOffline(tableId, tableName);
+      Map<KeyExtent,List<Range>> map = new HashMap<>();
+      ranges.forEach(r -> {
+        TabletsMetadata tm = context.getAmple().readTablets().forTable(this.tableId)
+            .overlapping(r.getStartKey() == null ? null : r.getStartKey().getRow(),
+                r.isStartKeyInclusive(), r.getEndKey() == null ? null : r.getEndKey().getRow())
+            .build();

Review Comment:
   Always doing a metadata lookup per range for the batch scanner could be a performance problem.  Would be better to have some sort of caching.  Can cache previously seen extents as long as the cache is invalidated when scan server reports back that extent does not exists.
   
   There may be some code from the bulk import code that we could use, for the case when bulk import works with an offline table.  I will look around for that.



##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java:
##########
@@ -288,63 +290,88 @@ public static List<KeyValue> scan(ClientContext context, ScanState scanState, lo
         if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut)
           throw new ScanTimedOutException();
 
-        while (loc == null) {
-          long currentTime = System.currentTimeMillis();
-          if ((currentTime - startTime) / 1000.0 > timeOut)
-            throw new ScanTimedOutException();
-
-          Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet");
-          try (Scope locateSpan = child1.makeCurrent()) {
-            loc = TabletLocator.getLocator(context, scanState.tableId).locateTablet(context,
-                scanState.startRow, scanState.skipStartRow, false);
-
-            if (loc == null) {
-              context.requireNotDeleted(scanState.tableId);
-              context.requireNotOffline(scanState.tableId, null);
-
-              error = "Failed to locate tablet for table : " + scanState.tableId + " row : "
-                  + scanState.startRow;
+        boolean tableOnline =
+            context.tableOperations().isOnline(context.getTableName(scanState.tableId));
+
+        if (scanState.runOnScanServer && !tableOnline) {
+          // The TabletLocator only caches the location of tablets for online tables. The ScanServer
+          // can scan online and offline tables, but does not require the location of the tablet. It
+          // only requires the KeyExtent. Find the first extent for the table that matches the
+          // startRow provided in the scanState. If no startRow is provided, use the first KeyExtent
+          // for the table
+          Text endRow = scanState.range != null && scanState.range.getEndKey() != null
+              ? scanState.range.getEndKey().getRow() : null;
+          TabletsMetadata tm = context.getAmple().readTablets().forTable(scanState.tableId)

Review Comment:
   Would be better to cache extents.



##########
test/src/main/java/org/apache/accumulo/test/ScanServerIT.java:
##########
@@ -123,21 +127,81 @@ public void testScan() throws Exception {
     }
   }
 
+  @Test
+  public void testScanOfflineSingleTablet() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      final int ingestedEntryCount =
+          createTableAndIngest(client, tableName, null, 10, 10, "colf", false);
+      client.tableOperations().offline(tableName, true);
+      assertFalse(client.tableOperations().isOnline(tableName));
+
+      TabletLocator.getLocator((ClientContext) client,
+          TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed entries");
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testScanOfflineTableManyTablets() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+
+      final int ingestedEntryCount =
+          createTableAndIngest(client, tableName, null, 10, 10, "colf", true);
+      client.tableOperations().offline(tableName, true);
+      assertFalse(client.tableOperations().isOnline(tableName));
+
+      TabletLocator.getLocator((ClientContext) client,
+          TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed entries");
+      } // when the scanner is closed, all open sessions should be closed
+    }
+  }
+
+  @Test
+  public void testScanTableCreatedOffline() throws Exception {
+
+    try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
+      String tableName = getUniqueNames(1)[0];
+      client.tableOperations().create(tableName, new NewTableConfiguration().createOffline());
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+      } // when the scanner is closed, all open sessions should be closed

Review Comment:
   ```suggestion
           assertEquals(0, Iterables.size(scanner), "Unexpected entries seen when scanning empty table");
         } // when the scanner is closed, all open sessions should be closed
         assertFalse(client.tableOperations().isOnline(tableName));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org