You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2022/12/01 18:41:56 UTC

[accumulo] branch revert-3102-2.1 created (now cf0018e590)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a change to branch revert-3102-2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


      at cf0018e590 Revert "Revert "Allow ScanServers to scan offline tables (#3082)" (#3101) (#3102)"

This branch includes the following new commits:

     new cf0018e590 Revert "Revert "Allow ScanServers to scan offline tables (#3082)" (#3101) (#3102)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[accumulo] 01/01: Revert "Revert "Allow ScanServers to scan offline tables (#3082)" (#3101) (#3102)"

Posted by dl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch revert-3102-2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit cf0018e5909f3c19d566c7bb06a118d815608351
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Dec 1 13:41:51 2022 -0500

    Revert "Revert "Allow ScanServers to scan offline tables (#3082)" (#3101) (#3102)"
    
    This reverts commit 02bd367577b8aa8d8a0bea629fd548ebab04611e.
---
 .../apache/accumulo/core/clientImpl/ClientContext.java  |  7 +++----
 .../apache/accumulo/core/clientImpl/ScannerImpl.java    | 11 +++++++++++
 .../core/clientImpl/TabletServerBatchReader.java        |  4 ++++
 .../java/org/apache/accumulo/test/ScanServerIT.java     | 17 ++++++++---------
 4 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index 25eba7ecc4..cdad4d673e 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -689,8 +689,8 @@ public class ClientContext implements AccumuloClient {
       int numQueryThreads) throws TableNotFoundException {
     ensureOpen();
     checkArgument(authorizations != null, "authorizations is null");
-    return new TabletServerBatchReader(this, requireNotOffline(getTableId(tableName), tableName),
-        tableName, authorizations, numQueryThreads);
+    return new TabletServerBatchReader(this, getTableId(tableName), tableName, authorizations,
+        numQueryThreads);
   }
 
   @Override
@@ -777,8 +777,7 @@ public class ClientContext implements AccumuloClient {
       throws TableNotFoundException {
     ensureOpen();
     checkArgument(authorizations != null, "authorizations is null");
-    Scanner scanner =
-        new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations);
+    Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations);
     Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties());
     if (batchSize != null) {
       scanner.setBatchSize(batchSize);
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
index c418e5a412..5fbacd7f7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
@@ -152,6 +153,16 @@ public class ScannerImpl extends ScannerOptions implements Scanner {
   @Override
   public synchronized Iterator<Entry<Key,Value>> iterator() {
     ensureOpen();
+
+    if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
+      try {
+        String tableName = context.getTableName(tableId);
+        context.requireNotOffline(tableId, tableName);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException("Table not found", e);
+      }
+    }
+
     ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size,
         getTimeout(SECONDS), this, isolated, readaheadThreshold, new Reporter());
 
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index c0de759516..9508f06aa6 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -115,6 +115,10 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
       throw new IllegalStateException("batch reader closed");
     }
 
+    if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) {
+      context.requireNotOffline(tableId, tableName);
+    }
+
     return new TabletServerBatchReaderIterator(context, tableId, tableName, authorizations, ranges,
         numThreads, queryThreadPool, this, timeOut);
   }
diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
index 528fd7f441..f9557859a9 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.IteratorSetting;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel;
-import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.TimedOutException;
 import org.apache.accumulo.core.client.admin.NewTableConfiguration;
 import org.apache.accumulo.core.conf.ClientProperty;
@@ -150,19 +149,19 @@ public class ScanServerIT extends SharedMiniClusterBase {
 
   @Test
   public void testScanOfflineTable() throws Exception {
+
     try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) {
       String tableName = getUniqueNames(1)[0];
 
-      createTableAndIngest(client, tableName, null, 10, 10, "colf");
+      final int ingestedEntryCount = createTableAndIngest(client, tableName, null, 10, 10, "colf");
       client.tableOperations().offline(tableName, true);
 
-      assertThrows(TableOfflineException.class, () -> {
-        try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
-          scanner.setRange(new Range());
-          scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
-          assertEquals(100, Iterables.size(scanner));
-        } // when the scanner is closed, all open sessions should be closed
-      });
+      try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) {
+        scanner.setRange(new Range());
+        scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+        assertEquals(ingestedEntryCount, Iterables.size(scanner),
+            "The scan server scanner should have seen all ingested and flushed entries");
+      } // when the scanner is closed, all open sessions should be closed
     }
   }