You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2023/11/24 23:41:47 UTC

(accumulo) branch main updated: Use single mutation during no chop fence upgrade (#3954)

This is an automated email from the ASF dual-hosted git repository.

edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 5a7e6eba0c Use single mutation during no chop fence upgrade (#3954)
5a7e6eba0c is described below

commit 5a7e6eba0c390ddbdecef95a2dc2cad6f64a83f3
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Fri Nov 24 18:41:41 2023 -0500

    Use single mutation during no chop fence upgrade (#3954)
    
    - Uses single mutation during no chop fence upgrade
    - Add guard to prevent trying empty mutation
---
 .../accumulo/manager/upgrade/Upgrader11to12.java   | 158 ++++----
 .../manager/upgrade/Upgrader11to12Test.java        | 406 ++++++++++++++-------
 2 files changed, 351 insertions(+), 213 deletions(-)

diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
index 2d9f6b4a07..72127a4d4c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
@@ -20,27 +20,31 @@ package org.apache.accumulo.manager.upgrade;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
 import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
 
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
+import java.util.Arrays;
+import java.util.Map;
+
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
 import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
 import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
+import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Stat;
 import org.checkerframework.checker.nullness.qual.NonNull;
@@ -48,6 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 public class Upgrader11to12 implements Upgrader {
 
@@ -84,111 +89,100 @@ public class Upgrader11to12 implements Upgrader {
   public void upgradeRoot(@NonNull ServerContext context) {
     log.debug("Upgrade root: upgrading to data version {}", METADATA_FILE_JSON_ENCODING);
     var rootName = Ample.DataLevel.METADATA.metaTable();
-    processReferences(context, rootName);
+    // not using ample to avoid StoredTabletFile because old file ref is incompatible
+    try (BatchWriter batchWriter = context.createBatchWriter(rootName); Scanner scanner =
+        new IsolatedScanner(context.createScanner(rootName, Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, rootName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + rootName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + rootName);
+      log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + rootName, mex);
+    }
   }
 
   @Override
   public void upgradeMetadata(@NonNull ServerContext context) {
     log.debug("Upgrade metadata: upgrading to data version {}", METADATA_FILE_JSON_ENCODING);
     var metaName = Ample.DataLevel.USER.metaTable();
-    processReferences(context, metaName);
+    try (BatchWriter batchWriter = context.createBatchWriter(metaName); Scanner scanner =
+        new IsolatedScanner(context.createScanner(metaName, Authorizations.EMPTY))) {
+      processReferences(batchWriter, scanner, metaName);
+    } catch (TableNotFoundException ex) {
+      throw new IllegalStateException("Failed to find table " + metaName, ex);
+    } catch (MutationsRejectedException mex) {
+      log.warn("Failed to update reference for table: " + metaName);
+      log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries());
+      throw new IllegalStateException("Failed to process table: " + metaName, mex);
+    }
   }
 
-  private void processReferences(ServerContext context, String tableName) {
-    // not using ample to avoid StoredTabletFile because old file ref is incompatible
-    try (AccumuloClient c = Accumulo.newClient().from(context.getProperties()).build();
-        BatchWriter batchWriter = c.createBatchWriter(tableName); Scanner scanner =
-            new IsolatedScanner(context.createScanner(tableName, Authorizations.EMPTY))) {
+  void processReferences(BatchWriter batchWriter, Scanner scanner, String tableName) {
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    try {
+      Mutation update = null;
+      for (Map.Entry<Key,Value> entry : scanner) {
+        Key key = entry.getKey();
+        Value value = entry.getValue();
+        Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+            "Expected empty visibility, saw %s ", key.getColumnVisibilityData());
+        // on new row, write current mutation and prepare a new one.
+        Text r = key.getRow();
+        if (update == null) {
+          update = new Mutation(r);
+        } else if (!Arrays.equals(update.getRow(), TextUtil.getBytes(r))) {
+          if (log.isTraceEnabled()) {
+            log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+          }
+          if (!update.getUpdates().isEmpty()) {
+            batchWriter.addMutation(update);
+          }
+          update = new Mutation(r);
+        }
 
-      scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
-      scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
-      scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
-      scanner.forEach((k, v) -> {
-        var family = k.getColumnFamily();
+        var family = key.getColumnFamily();
         if (family.equals(DataFileColumnFamily.NAME)) {
-          upgradeDataFileCF(k, v, batchWriter, tableName);
+          upgradeDataFileCF(key, value, update);
         } else if (family.equals(ChoppedColumnFamily.NAME)) {
-          removeChoppedCF(k, batchWriter, tableName);
+          log.warn(
+              "Deleting chopped reference from:{}. Previous split or delete may not have completed cleanly. Ref: {}",
+              tableName, key.getRow());
+          update.at().family(ChoppedColumnFamily.STR_NAME).qualifier(ChoppedColumnFamily.STR_NAME)
+              .delete();
         } else if (family.equals(ExternalCompactionColumnFamily.NAME)) {
-          removeExternalCompactionCF(k, batchWriter, tableName);
+          log.debug(
+              "Deleting external compaction reference from:{}. Previous compaction may not have completed. Ref: {}",
+              tableName, key.getRow());
+          update.at().family(ExternalCompactionColumnFamily.NAME)
+              .qualifier(key.getColumnQualifier()).delete();
         } else {
           throw new IllegalStateException("Processing: " + tableName
               + " Received unexpected column family processing references: " + family);
         }
-      });
+      }
+      // send last mutation
+      if (update != null && !update.getUpdates().isEmpty()) {
+        log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+        batchWriter.addMutation(update);
+      }
     } catch (MutationsRejectedException mex) {
       log.warn("Failed to update reference for table: " + tableName);
       log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries());
       throw new IllegalStateException("Failed to process table: " + tableName, mex);
-    } catch (Exception ex) {
-      throw new IllegalStateException("Failed to process table: " + tableName, ex);
     }
   }
 
   @VisibleForTesting
-  void upgradeDataFileCF(final Key key, final Value value, final BatchWriter batchWriter,
-      final String tableName) {
+  void upgradeDataFileCF(final Key key, final Value value, final Mutation m) {
     String file = key.getColumnQualifier().toString();
     // filter out references if they are in the correct format already.
     if (fileNeedsConversion(file)) {
       var fileJson = StoredTabletFile.of(new Path(file)).getMetadataText();
-      try {
-        Mutation update = new Mutation(key.getRow());
-        update.at().family(DataFileColumnFamily.STR_NAME).qualifier(fileJson).put(value);
-        log.trace("table: {}, adding: {}", tableName, update.prettyPrint());
-        batchWriter.addMutation(update);
-
-        Mutation delete = new Mutation(key.getRow());
-        delete.at().family(DataFileColumnFamily.STR_NAME).qualifier(file).delete();
-        log.trace("table {}: deleting: {}", tableName, delete.prettyPrint());
-        batchWriter.addMutation(delete);
-      } catch (MutationsRejectedException ex) {
-        // include constraint violation info in log - but stop upgrade
-        log.warn(
-            "Failed to update file reference for table: " + tableName + ". row: " + key.getRow());
-        log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries());
-        throw new IllegalStateException("File conversion failed. Aborting upgrade", ex);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  void removeChoppedCF(final Key key, final BatchWriter batchWriter, final String tableName) {
-    Mutation delete = null;
-    try {
-      delete = new Mutation(key.getRow()).at().family(ChoppedColumnFamily.STR_NAME)
-          .qualifier(ChoppedColumnFamily.STR_NAME).delete();
-      log.warn(
-          "Deleting chopped reference from:{}. Previous split or delete may not have completed cleanly. Ref: {}",
-          tableName, delete.prettyPrint());
-      batchWriter.addMutation(delete);
-    } catch (MutationsRejectedException ex) {
-      log.warn("Failed to delete obsolete chopped CF reference for table: " + tableName + ". Ref: "
-          + delete.prettyPrint() + ". Will try to continue. Ref may need to be manually removed");
-      log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries());
-      throw new IllegalStateException(
-          "Failed to delete obsolete chopped CF reference for table: " + tableName, ex);
-    }
-  }
-
-  @VisibleForTesting
-  void removeExternalCompactionCF(final Key key, final BatchWriter batchWriter,
-      final String tableName) {
-    Mutation delete = null;
-    try {
-      delete = new Mutation(key.getRow()).at().family(ExternalCompactionColumnFamily.NAME)
-          .qualifier(key.getColumnQualifier()).delete();
-      log.debug(
-          "Deleting external compaction reference from:{}. Previous compaction may not have completed. Ref: {}",
-          tableName, delete.prettyPrint());
-      batchWriter.addMutation(delete);
-    } catch (MutationsRejectedException ex) {
-      log.warn("Failed to delete obsolete external compaction CF reference for table: " + tableName
-          + ". Ref: " + delete.prettyPrint()
-          + ". Will try to continue. Ref may need to be manually removed");
-      log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries());
-      throw new IllegalStateException(
-          "Failed to delete obsolete external compaction CF reference for table: " + tableName, ex);
+      m.at().family(DataFileColumnFamily.STR_NAME).qualifier(fileJson).put(value);
+      m.at().family(DataFileColumnFamily.STR_NAME).qualifier(file).delete();
     }
   }
 
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java
index da12f4ede2..ee45534dab 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java
@@ -19,14 +19,12 @@
 package org.apache.accumulo.manager.upgrade;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
 import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.eq;
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.newCapture;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
@@ -35,12 +33,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.data.ColumnUpdate;
 import org.apache.accumulo.core.data.InstanceId;
 import org.apache.accumulo.core.data.Key;
@@ -48,6 +49,10 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -62,198 +67,337 @@ public class Upgrader11to12Test {
   private static final Logger LOG = LoggerFactory.getLogger(Upgrader11to12Test.class);
 
   @Test
-  void upgradeDataFileCFTest() throws Exception {
+  void upgradeDataFileCF2Test() {
     Upgrader11to12 upgrader = new Upgrader11to12();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall();
-
-    Capture<Mutation> capturedDelete = newCapture();
-    bw.addMutation(capture(capturedDelete));
-    expectLastCall();
-
-    replay(bw);
-
     String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
     Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
         .qualifier(new Text(fileName)).build();
     Value v = new Value("1234,5678");
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    Mutation upgrade = new Mutation(k.getRow());
+    upgrader.upgradeDataFileCF(k, v, upgrade);
 
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
-    Mutation add = new Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).put(v);
-    LOG.debug("add mutation to be expected: {}", add.prettyPrint());
+    var pending = upgrade.getUpdates();
+    assertEquals(2, pending.size());
+    // leverage sort order for "expected" values
+    // check file entry converted is in the mutation
+    Iterator<ColumnUpdate> m = pending.iterator();
+    var cu1 = m.next();
+    assertEquals("file", new Text(cu1.getColumnFamily()).toString());
 
-    Mutation delete = new Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(fileName)).delete();
-    LOG.debug("delete mutation to be expected: {}", delete.prettyPrint());
+    StoredTabletFile oldFileEntry = StoredTabletFile.of(new Path(fileName));
+    StoredTabletFile updateEnry = StoredTabletFile.of(new String(cu1.getColumnQualifier(), UTF_8));
 
-    assertEquals(add, capturedAdd.getValue());
-    assertEquals(delete, capturedDelete.getValue());
+    assertEquals(oldFileEntry, updateEnry);
+    assertFalse(cu1.isDeleted());
 
-    verify(bw);
-  }
+    // check old file entry is deleted is in the mutation
 
-  @Test
-  void upgradeDataFileCFSkipConvertedTest() {
-    Upgrader11to12 upgrader = new Upgrader11to12();
+    var cu2 = m.next();
+    assertEquals("file", new Text(cu1.getColumnFamily()).toString());
+    assertEquals(fileName, new String(cu2.getColumnQualifier(), UTF_8));
+    assertTrue(cu2.isDeleted());
 
-    BatchWriter bw = createMock(BatchWriter.class);
+  }
 
-    replay(bw);
+  @Test
+  public void processReferencesTest() throws Exception {
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().once();
+
+    Capture<Mutation> capturedUpdate2 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate2));
+    expectLastCall().once();
+
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
+
+    String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
+    Key key1 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
+
+    String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+    Key key2 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("321,654");
+    scanData.put(key2, value2);
+
+    Key chop1 = Key.builder(false).row(row1).family(ChoppedColumnFamily.NAME)
+        .qualifier(ChoppedColumnFamily.NAME).build();
+    scanData.put(chop1, null);
+
+    Key extern1 = Key.builder(false).row(row1).family(ExternalCompactionColumnFamily.NAME)
+        .qualifier(ExternalCompactionColumnFamily.NAME).build();
+    scanData.put(extern1, null);
 
-    String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
-    StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
+    Text row2 = new Text("234");
 
-    Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(stf.getMetadataText()).build();
-    Value v = new Value("1234,5678");
+    String fileName3 = "hdfs://localhost:8020/accumulo/tables/13/default_tablet/C000000v.rf";
+    Key key3 =
+        Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName3).build();
+    Value value3 = new Value("1,2");
+    scanData.put(key3, value3);
 
-    upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    // with file entry in correct formation, no mutations are expected.
-    verify(bw);
-  }
+    expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-  @Test
-  void upgradeDataFileCFInvalidMutationTest() throws Exception {
     Upgrader11to12 upgrader = new Upgrader11to12();
+    upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> capturedAdd = newCapture();
-    bw.addMutation(capture(capturedAdd));
-    expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
-        0, new NullPointerException()));
+    LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+    var u1 = capturedUpdate1.getValue();
+    // 2 file add, 2 file delete. 1 chop delete, 1 ext comp delete
+    assertEquals(6, u1.getUpdates().size());
 
-    replay(bw);
+    LOG.info("c:{}", capturedUpdate2.getValue().prettyPrint());
+    var u2 = capturedUpdate2.getValue();
+    // 1 add, 1 delete
+    assertEquals(2, u2.getUpdates().size());
+    assertEquals(1, u2.getUpdates().stream().filter(ColumnUpdate::isDeleted).count());
 
-    String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
-    Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(fileName)).build();
-    Value v = new Value("1234,5678");
-
-    assertThrows(IllegalStateException.class, () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable"));
+    verify(batchWriter, scanner);
 
-    verify(bw);
   }
 
   @Test
-  void upgradeDataFileCFInvalidPathTest() {
-    Upgrader11to12 upgrader = new Upgrader11to12();
-
-    BatchWriter bw = createMock(BatchWriter.class);
-
-    replay(bw);
+  public void skipConvertedFileTest() throws Exception {
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().once();
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
+
+    // reference already in expected form with fence info.
+    String fileName1 =
+        "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+    Key key1 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
+
+    String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+    Key key2 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("321,654");
+    scanData.put(key2, value2);
+
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    String invalidPath = "badPath";
+    expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-    Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
-        .qualifier(new Text(invalidPath)).build();
-    Value v = new Value("1234,5678");
+    Upgrader11to12 upgrader = new Upgrader11to12();
+    upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
 
-    assertThrows(IllegalArgumentException.class,
-        () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable"));
+    LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+    var u1 = capturedUpdate1.getValue();
+    // 1 add, 1 delete
+    assertEquals(2, u1.getUpdates().size());
+    assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count());
 
-    verify(bw);
+    verify(batchWriter, scanner);
   }
 
   @Test
-  void removeChoppedCFTest() throws Exception {
-    Upgrader11to12 upgrader = new Upgrader11to12();
+  void failOnMutationErrorTest() throws Exception {
 
-    Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
-        .qualifier(ExternalCompactionColumnFamily.NAME).build();
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
+        0, new NullPointerException())).once();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> captured = newCapture();
-    bw.addMutation(capture(captured));
-    expectLastCall();
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
 
-    replay(bw);
+    // reference already in expected form with fence info.
+    String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
+    Key key1 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
 
-    upgrader.removeChoppedCF(k, bw, "aTable");
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    Mutation delete = new Mutation(k.getRow()).at().family(ChoppedColumnFamily.NAME)
-        .qualifier(ChoppedColumnFamily.NAME).delete();
+    expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
+    Upgrader11to12 upgrader = new Upgrader11to12();
 
-    assertEquals(delete, captured.getValue());
+    assertThrows(IllegalStateException.class,
+        () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"));
 
-    verify(bw);
+    verify(batchWriter, scanner);
   }
 
   @Test
-  void removeChoppedCFContinuesTest() throws Exception {
-    Upgrader11to12 upgrader = new Upgrader11to12();
-
-    Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
-        .qualifier(ExternalCompactionColumnFamily.NAME).build();
-
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> captured = newCapture();
-    bw.addMutation(capture(captured));
-    expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
-        0, new NullPointerException()));
+  void upgradeDataFileCFInvalidPathTest() throws Exception {
+
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    // expecting that exception will be called before mutation is updated.
+    expectLastCall().andThrow(new UnsupportedOperationException()).anyTimes();
+
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
+
+    String fileName1 = "bad path";
+    Key key1 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
+
+    String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+    Key key2 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("321,654");
+    scanData.put(key2, value2);
+
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    replay(bw);
+    expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-    assertThrows(IllegalStateException.class, () -> upgrader.removeChoppedCF(k, bw, "aTable"));
+    Upgrader11to12 upgrader = new Upgrader11to12();
+    assertThrows(IllegalArgumentException.class,
+        () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"));
 
-    verify(bw);
+    verify(batchWriter, scanner);
   }
 
   @Test
-  void removeExternalCompactionCFTest() throws Exception {
-    Upgrader11to12 upgrader = new Upgrader11to12();
+  void unexpectedColFailsTest() throws Exception {
 
-    Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
-        .qualifier(new Text("ECID:1234")).build();
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    // expecting that exception will be called before mutation is updated.
+    expectLastCall().andThrow(new UnsupportedOperationException()).anyTimes();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> captured = newCapture();
-    bw.addMutation(capture(captured));
-    expectLastCall();
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+    Text row1 = new Text("123");
 
-    replay(bw);
+    Key key1 = Key.builder(false).row(row1).family(LastLocationColumnFamily.NAME).qualifier("srv1")
+        .build();
+    Value value1 = new Value("123,456");
+    scanData.put(key1, value1);
 
-    upgrader.removeExternalCompactionCF(k, bw, "aTable");
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    Mutation delete = new Mutation(k.getRow()).at().family(ExternalCompactionColumnFamily.NAME)
-        .qualifier(new Text("ECID:1234")).delete();
+    expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-    assertEquals(delete, captured.getValue());
+    Upgrader11to12 upgrader = new Upgrader11to12();
+    assertThrows(IllegalStateException.class,
+        () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"));
 
-    for (ColumnUpdate update : captured.getValue().getUpdates()) {
-      assertEquals(ExternalCompactionColumnFamily.STR_NAME,
-          new String(update.getColumnFamily(), UTF_8));
-      assertEquals("ECID:1234", new String(update.getColumnQualifier(), UTF_8));
-      assertTrue(update.isDeleted());
-    }
-    verify(bw);
+    verify(batchWriter, scanner);
   }
 
+  /**
+   * process 3 rows, 2 should result in no mutations and batch writer addMutation should not be
+   * called for those rows
+   */
   @Test
-  void removeExternalCompactionCFContinuesTest() throws Exception {
-    Upgrader11to12 upgrader = new Upgrader11to12();
-
-    Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
-        .qualifier(new Text("ECID:1234")).build();
+  public void verifyEmptyMutation() throws Exception {
+    BatchWriter batchWriter = mock(BatchWriter.class);
+    Capture<Mutation> capturedUpdate1 = newCapture();
+    batchWriter.addMutation(capture(capturedUpdate1));
+    expectLastCall().once();
+    // create sample data "served" by the mocked scanner
+    TreeMap<Key,Value> scanData = new TreeMap<>();
+
+    Text row1 = new Text("1");
+
+    String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/1111000v.rf";
+    Key key1 =
+        Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+    Value value1 = new Value("111,222");
+    scanData.put(key1, value1);
+
+    Text row2 = new Text("a");
+
+    // reference already in expected form with fence info.
+    String fileName2 =
+        "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+    Key key2 =
+        Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+    Value value2 = new Value("222,333");
+    scanData.put(key2, value2);
+
+    Text row3 = new Text("b");
+
+    // reference already in expected form with fence info.
+    String fileName3 =
+        "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/BBBB000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+    Key key3 =
+        Key.builder(false).row(row3).family(DataFileColumnFamily.NAME).qualifier(fileName3).build();
+    Value value3 = new Value("333,444");
+    scanData.put(key3, value3);
+
+    Scanner scanner = mock(Scanner.class);
+    scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+    expectLastCall();
+    scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+    expectLastCall();
 
-    BatchWriter bw = createMock(BatchWriter.class);
-    Capture<Mutation> captured = newCapture();
-    bw.addMutation(capture(captured));
-    expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
-        0, new NullPointerException()));
+    expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+    replay(batchWriter, scanner);
 
-    replay(bw);
+    Upgrader11to12 upgrader = new Upgrader11to12();
+    upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
 
-    assertThrows(IllegalStateException.class,
-        () -> upgrader.removeExternalCompactionCF(k, bw, "aTable"));
+    LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+    var u1 = capturedUpdate1.getValue();
+    // 1 add, 1 delete
+    assertEquals(2, u1.getUpdates().size());
+    assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count());
 
-    verify(bw);
+    verify(batchWriter, scanner);
   }
 
   @Test