You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2023/11/24 23:41:47 UTC
(accumulo) branch main updated: Use single mutation during no chop fence upgrade (#3954)
This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 5a7e6eba0c Use single mutation during no chop fence upgrade (#3954)
5a7e6eba0c is described below
commit 5a7e6eba0c390ddbdecef95a2dc2cad6f64a83f3
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Fri Nov 24 18:41:41 2023 -0500
Use single mutation during no chop fence upgrade (#3954)
- Uses single mutation during no chop fence upgrade
- Add guard to prevent trying empty mutation
---
.../accumulo/manager/upgrade/Upgrader11to12.java | 158 ++++----
.../manager/upgrade/Upgrader11to12Test.java | 406 ++++++++++++++-------
2 files changed, 351 insertions(+), 213 deletions(-)
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
index 2d9f6b4a07..72127a4d4c 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/Upgrader11to12.java
@@ -20,27 +20,31 @@ package org.apache.accumulo.manager.upgrade;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
import static org.apache.accumulo.server.AccumuloDataVersion.METADATA_FILE_JSON_ENCODING;
-import org.apache.accumulo.core.client.Accumulo;
-import org.apache.accumulo.core.client.AccumuloClient;
+import java.util.Arrays;
+import java.util.Map;
+
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
+import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.TextUtil;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.checkerframework.checker.nullness.qual.NonNull;
@@ -48,6 +52,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
public class Upgrader11to12 implements Upgrader {
@@ -84,111 +89,100 @@ public class Upgrader11to12 implements Upgrader {
public void upgradeRoot(@NonNull ServerContext context) {
log.debug("Upgrade root: upgrading to data version {}", METADATA_FILE_JSON_ENCODING);
var rootName = Ample.DataLevel.METADATA.metaTable();
- processReferences(context, rootName);
+ // not using ample to avoid StoredTabletFile because old file ref is incompatible
+ try (BatchWriter batchWriter = context.createBatchWriter(rootName); Scanner scanner =
+ new IsolatedScanner(context.createScanner(rootName, Authorizations.EMPTY))) {
+ processReferences(batchWriter, scanner, rootName);
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException("Failed to find table " + rootName, ex);
+ } catch (MutationsRejectedException mex) {
+ log.warn("Failed to update reference for table: " + rootName);
+ log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries());
+ throw new IllegalStateException("Failed to process table: " + rootName, mex);
+ }
}
@Override
public void upgradeMetadata(@NonNull ServerContext context) {
log.debug("Upgrade metadata: upgrading to data version {}", METADATA_FILE_JSON_ENCODING);
var metaName = Ample.DataLevel.USER.metaTable();
- processReferences(context, metaName);
+ try (BatchWriter batchWriter = context.createBatchWriter(metaName); Scanner scanner =
+ new IsolatedScanner(context.createScanner(metaName, Authorizations.EMPTY))) {
+ processReferences(batchWriter, scanner, metaName);
+ } catch (TableNotFoundException ex) {
+ throw new IllegalStateException("Failed to find table " + metaName, ex);
+ } catch (MutationsRejectedException mex) {
+ log.warn("Failed to update reference for table: " + metaName);
+ log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries());
+ throw new IllegalStateException("Failed to process table: " + metaName, mex);
+ }
}
- private void processReferences(ServerContext context, String tableName) {
- // not using ample to avoid StoredTabletFile because old file ref is incompatible
- try (AccumuloClient c = Accumulo.newClient().from(context.getProperties()).build();
- BatchWriter batchWriter = c.createBatchWriter(tableName); Scanner scanner =
- new IsolatedScanner(context.createScanner(tableName, Authorizations.EMPTY))) {
+ void processReferences(BatchWriter batchWriter, Scanner scanner, String tableName) {
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ try {
+ Mutation update = null;
+ for (Map.Entry<Key,Value> entry : scanner) {
+ Key key = entry.getKey();
+ Value value = entry.getValue();
+ Preconditions.checkState(key.getColumnVisibilityData().length() == 0,
+ "Expected empty visibility, saw %s ", key.getColumnVisibilityData());
+ // on new row, write current mutation and prepare a new one.
+ Text r = key.getRow();
+ if (update == null) {
+ update = new Mutation(r);
+ } else if (!Arrays.equals(update.getRow(), TextUtil.getBytes(r))) {
+ if (log.isTraceEnabled()) {
+ log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+ }
+ if (!update.getUpdates().isEmpty()) {
+ batchWriter.addMutation(update);
+ }
+ update = new Mutation(r);
+ }
- scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
- scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
- scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
- scanner.forEach((k, v) -> {
- var family = k.getColumnFamily();
+ var family = key.getColumnFamily();
if (family.equals(DataFileColumnFamily.NAME)) {
- upgradeDataFileCF(k, v, batchWriter, tableName);
+ upgradeDataFileCF(key, value, update);
} else if (family.equals(ChoppedColumnFamily.NAME)) {
- removeChoppedCF(k, batchWriter, tableName);
+ log.warn(
+ "Deleting chopped reference from:{}. Previous split or delete may not have completed cleanly. Ref: {}",
+ tableName, key.getRow());
+ update.at().family(ChoppedColumnFamily.STR_NAME).qualifier(ChoppedColumnFamily.STR_NAME)
+ .delete();
} else if (family.equals(ExternalCompactionColumnFamily.NAME)) {
- removeExternalCompactionCF(k, batchWriter, tableName);
+ log.debug(
+ "Deleting external compaction reference from:{}. Previous compaction may not have completed. Ref: {}",
+ tableName, key.getRow());
+ update.at().family(ExternalCompactionColumnFamily.NAME)
+ .qualifier(key.getColumnQualifier()).delete();
} else {
throw new IllegalStateException("Processing: " + tableName
+ " Received unexpected column family processing references: " + family);
}
- });
+ }
+ // send last mutation
+ if (update != null && !update.getUpdates().isEmpty()) {
+ log.trace("table: {}, update: {}", tableName, update.prettyPrint());
+ batchWriter.addMutation(update);
+ }
} catch (MutationsRejectedException mex) {
log.warn("Failed to update reference for table: " + tableName);
log.warn("Constraint violations: {}", mex.getConstraintViolationSummaries());
throw new IllegalStateException("Failed to process table: " + tableName, mex);
- } catch (Exception ex) {
- throw new IllegalStateException("Failed to process table: " + tableName, ex);
}
}
@VisibleForTesting
- void upgradeDataFileCF(final Key key, final Value value, final BatchWriter batchWriter,
- final String tableName) {
+ void upgradeDataFileCF(final Key key, final Value value, final Mutation m) {
String file = key.getColumnQualifier().toString();
// filter out references if they are in the correct format already.
if (fileNeedsConversion(file)) {
var fileJson = StoredTabletFile.of(new Path(file)).getMetadataText();
- try {
- Mutation update = new Mutation(key.getRow());
- update.at().family(DataFileColumnFamily.STR_NAME).qualifier(fileJson).put(value);
- log.trace("table: {}, adding: {}", tableName, update.prettyPrint());
- batchWriter.addMutation(update);
-
- Mutation delete = new Mutation(key.getRow());
- delete.at().family(DataFileColumnFamily.STR_NAME).qualifier(file).delete();
- log.trace("table {}: deleting: {}", tableName, delete.prettyPrint());
- batchWriter.addMutation(delete);
- } catch (MutationsRejectedException ex) {
- // include constraint violation info in log - but stop upgrade
- log.warn(
- "Failed to update file reference for table: " + tableName + ". row: " + key.getRow());
- log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries());
- throw new IllegalStateException("File conversion failed. Aborting upgrade", ex);
- }
- }
- }
-
- @VisibleForTesting
- void removeChoppedCF(final Key key, final BatchWriter batchWriter, final String tableName) {
- Mutation delete = null;
- try {
- delete = new Mutation(key.getRow()).at().family(ChoppedColumnFamily.STR_NAME)
- .qualifier(ChoppedColumnFamily.STR_NAME).delete();
- log.warn(
- "Deleting chopped reference from:{}. Previous split or delete may not have completed cleanly. Ref: {}",
- tableName, delete.prettyPrint());
- batchWriter.addMutation(delete);
- } catch (MutationsRejectedException ex) {
- log.warn("Failed to delete obsolete chopped CF reference for table: " + tableName + ". Ref: "
- + delete.prettyPrint() + ". Will try to continue. Ref may need to be manually removed");
- log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries());
- throw new IllegalStateException(
- "Failed to delete obsolete chopped CF reference for table: " + tableName, ex);
- }
- }
-
- @VisibleForTesting
- void removeExternalCompactionCF(final Key key, final BatchWriter batchWriter,
- final String tableName) {
- Mutation delete = null;
- try {
- delete = new Mutation(key.getRow()).at().family(ExternalCompactionColumnFamily.NAME)
- .qualifier(key.getColumnQualifier()).delete();
- log.debug(
- "Deleting external compaction reference from:{}. Previous compaction may not have completed. Ref: {}",
- tableName, delete.prettyPrint());
- batchWriter.addMutation(delete);
- } catch (MutationsRejectedException ex) {
- log.warn("Failed to delete obsolete external compaction CF reference for table: " + tableName
- + ". Ref: " + delete.prettyPrint()
- + ". Will try to continue. Ref may need to be manually removed");
- log.warn("Constraint violations: {}", ex.getConstraintViolationSummaries());
- throw new IllegalStateException(
- "Failed to delete obsolete external compaction CF reference for table: " + tableName, ex);
+ m.at().family(DataFileColumnFamily.STR_NAME).qualifier(fileJson).put(value);
+ m.at().family(DataFileColumnFamily.STR_NAME).qualifier(file).delete();
}
}
diff --git a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java
index da12f4ede2..ee45534dab 100644
--- a/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java
+++ b/server/manager/src/test/java/org/apache/accumulo/manager/upgrade/Upgrader11to12Test.java
@@ -19,14 +19,12 @@
package org.apache.accumulo.manager.upgrade;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
-import static org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
import static org.easymock.EasyMock.capture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
import static org.easymock.EasyMock.newCapture;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
@@ -35,12 +33,15 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.UUID;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.ColumnUpdate;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.Key;
@@ -48,6 +49,10 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.StoredTabletFile;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.DataFileColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ExternalCompactionColumnFamily;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.LastLocationColumnFamily;
+import org.apache.accumulo.core.metadata.schema.UpgraderDeprecatedConstants.ChoppedColumnFamily;
import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -62,198 +67,337 @@ public class Upgrader11to12Test {
private static final Logger LOG = LoggerFactory.getLogger(Upgrader11to12Test.class);
@Test
- void upgradeDataFileCFTest() throws Exception {
+ void upgradeDataFileCF2Test() {
Upgrader11to12 upgrader = new Upgrader11to12();
- BatchWriter bw = createMock(BatchWriter.class);
- Capture<Mutation> capturedAdd = newCapture();
- bw.addMutation(capture(capturedAdd));
- expectLastCall();
-
- Capture<Mutation> capturedDelete = newCapture();
- bw.addMutation(capture(capturedDelete));
- expectLastCall();
-
- replay(bw);
-
String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
.qualifier(new Text(fileName)).build();
Value v = new Value("1234,5678");
- upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+ Mutation upgrade = new Mutation(k.getRow());
+ upgrader.upgradeDataFileCF(k, v, upgrade);
- StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
- Mutation add = new Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
- .qualifier(stf.getMetadataText()).put(v);
- LOG.debug("add mutation to be expected: {}", add.prettyPrint());
+ var pending = upgrade.getUpdates();
+ assertEquals(2, pending.size());
+ // leverage sort order for "expected" values
+ // check file entry converted is in the mutation
+ Iterator<ColumnUpdate> m = pending.iterator();
+ var cu1 = m.next();
+ assertEquals("file", new Text(cu1.getColumnFamily()).toString());
- Mutation delete = new Mutation(k.getRow()).at().family(DataFileColumnFamily.NAME)
- .qualifier(new Text(fileName)).delete();
- LOG.debug("delete mutation to be expected: {}", delete.prettyPrint());
+ StoredTabletFile oldFileEntry = StoredTabletFile.of(new Path(fileName));
+ StoredTabletFile updateEnry = StoredTabletFile.of(new String(cu1.getColumnQualifier(), UTF_8));
- assertEquals(add, capturedAdd.getValue());
- assertEquals(delete, capturedDelete.getValue());
+ assertEquals(oldFileEntry, updateEnry);
+ assertFalse(cu1.isDeleted());
- verify(bw);
- }
+ // check old file entry is deleted is in the mutation
- @Test
- void upgradeDataFileCFSkipConvertedTest() {
- Upgrader11to12 upgrader = new Upgrader11to12();
+ var cu2 = m.next();
+ assertEquals("file", new Text(cu1.getColumnFamily()).toString());
+ assertEquals(fileName, new String(cu2.getColumnQualifier(), UTF_8));
+ assertTrue(cu2.isDeleted());
- BatchWriter bw = createMock(BatchWriter.class);
+ }
- replay(bw);
+ @Test
+ public void processReferencesTest() throws Exception {
+ BatchWriter batchWriter = mock(BatchWriter.class);
+ Capture<Mutation> capturedUpdate1 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate1));
+ expectLastCall().once();
+
+ Capture<Mutation> capturedUpdate2 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate2));
+ expectLastCall().once();
+
+ // create sample data "served" by the mocked scanner
+ TreeMap<Key,Value> scanData = new TreeMap<>();
+ Text row1 = new Text("123");
+
+ String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
+ Key key1 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+ Value value1 = new Value("123,456");
+ scanData.put(key1, value1);
+
+ String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+ Key key2 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+ Value value2 = new Value("321,654");
+ scanData.put(key2, value2);
+
+ Key chop1 = Key.builder(false).row(row1).family(ChoppedColumnFamily.NAME)
+ .qualifier(ChoppedColumnFamily.NAME).build();
+ scanData.put(chop1, null);
+
+ Key extern1 = Key.builder(false).row(row1).family(ExternalCompactionColumnFamily.NAME)
+ .qualifier(ExternalCompactionColumnFamily.NAME).build();
+ scanData.put(extern1, null);
- String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
- StoredTabletFile stf = StoredTabletFile.of(new Path(fileName));
+ Text row2 = new Text("234");
- Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
- .qualifier(stf.getMetadataText()).build();
- Value v = new Value("1234,5678");
+ String fileName3 = "hdfs://localhost:8020/accumulo/tables/13/default_tablet/C000000v.rf";
+ Key key3 =
+ Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName3).build();
+ Value value3 = new Value("1,2");
+ scanData.put(key3, value3);
- upgrader.upgradeDataFileCF(k, v, bw, "aTable");
+ Scanner scanner = mock(Scanner.class);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ expectLastCall();
- // with file entry in correct formation, no mutations are expected.
- verify(bw);
- }
+ expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+ replay(batchWriter, scanner);
- @Test
- void upgradeDataFileCFInvalidMutationTest() throws Exception {
Upgrader11to12 upgrader = new Upgrader11to12();
+ upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
- BatchWriter bw = createMock(BatchWriter.class);
- Capture<Mutation> capturedAdd = newCapture();
- bw.addMutation(capture(capturedAdd));
- expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
- 0, new NullPointerException()));
+ LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+ var u1 = capturedUpdate1.getValue();
+ // 2 file add, 2 file delete. 1 chop delete, 1 ext comp delete
+ assertEquals(6, u1.getUpdates().size());
- replay(bw);
+ LOG.info("c:{}", capturedUpdate2.getValue().prettyPrint());
+ var u2 = capturedUpdate2.getValue();
+ // 1 add, 1 delete
+ assertEquals(2, u2.getUpdates().size());
+ assertEquals(1, u2.getUpdates().stream().filter(ColumnUpdate::isDeleted).count());
- String fileName = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
- Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
- .qualifier(new Text(fileName)).build();
- Value v = new Value("1234,5678");
-
- assertThrows(IllegalStateException.class, () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable"));
+ verify(batchWriter, scanner);
- verify(bw);
}
@Test
- void upgradeDataFileCFInvalidPathTest() {
- Upgrader11to12 upgrader = new Upgrader11to12();
-
- BatchWriter bw = createMock(BatchWriter.class);
-
- replay(bw);
+ public void skipConvertedFileTest() throws Exception {
+ BatchWriter batchWriter = mock(BatchWriter.class);
+ Capture<Mutation> capturedUpdate1 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate1));
+ expectLastCall().once();
+ // create sample data "served" by the mocked scanner
+ TreeMap<Key,Value> scanData = new TreeMap<>();
+ Text row1 = new Text("123");
+
+ // reference already in expected form with fence info.
+ String fileName1 =
+ "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+ Key key1 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+ Value value1 = new Value("123,456");
+ scanData.put(key1, value1);
+
+ String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+ Key key2 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+ Value value2 = new Value("321,654");
+ scanData.put(key2, value2);
+
+ Scanner scanner = mock(Scanner.class);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ expectLastCall();
- String invalidPath = "badPath";
+ expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+ replay(batchWriter, scanner);
- Key k = Key.builder().row(new Text("12;")).family(DataFileColumnFamily.NAME)
- .qualifier(new Text(invalidPath)).build();
- Value v = new Value("1234,5678");
+ Upgrader11to12 upgrader = new Upgrader11to12();
+ upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
- assertThrows(IllegalArgumentException.class,
- () -> upgrader.upgradeDataFileCF(k, v, bw, "aTable"));
+ LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+ var u1 = capturedUpdate1.getValue();
+ // 1 add, 1 delete
+ assertEquals(2, u1.getUpdates().size());
+ assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count());
- verify(bw);
+ verify(batchWriter, scanner);
}
@Test
- void removeChoppedCFTest() throws Exception {
- Upgrader11to12 upgrader = new Upgrader11to12();
+ void failOnMutationErrorTest() throws Exception {
- Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
- .qualifier(ExternalCompactionColumnFamily.NAME).build();
+ BatchWriter batchWriter = mock(BatchWriter.class);
+ Capture<Mutation> capturedUpdate1 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate1));
+ expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
+ 0, new NullPointerException())).once();
- BatchWriter bw = createMock(BatchWriter.class);
- Capture<Mutation> captured = newCapture();
- bw.addMutation(capture(captured));
- expectLastCall();
+ TreeMap<Key,Value> scanData = new TreeMap<>();
+ Text row1 = new Text("123");
- replay(bw);
+ // reference already in expected form with fence info.
+ String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf";
+ Key key1 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+ Value value1 = new Value("123,456");
+ scanData.put(key1, value1);
- upgrader.removeChoppedCF(k, bw, "aTable");
+ Scanner scanner = mock(Scanner.class);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ expectLastCall();
- Mutation delete = new Mutation(k.getRow()).at().family(ChoppedColumnFamily.NAME)
- .qualifier(ChoppedColumnFamily.NAME).delete();
+ expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+ replay(batchWriter, scanner);
+ Upgrader11to12 upgrader = new Upgrader11to12();
- assertEquals(delete, captured.getValue());
+ assertThrows(IllegalStateException.class,
+ () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"));
- verify(bw);
+ verify(batchWriter, scanner);
}
@Test
- void removeChoppedCFContinuesTest() throws Exception {
- Upgrader11to12 upgrader = new Upgrader11to12();
-
- Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
- .qualifier(ExternalCompactionColumnFamily.NAME).build();
-
- BatchWriter bw = createMock(BatchWriter.class);
- Capture<Mutation> captured = newCapture();
- bw.addMutation(capture(captured));
- expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
- 0, new NullPointerException()));
+ void upgradeDataFileCFInvalidPathTest() throws Exception {
+
+ BatchWriter batchWriter = mock(BatchWriter.class);
+ Capture<Mutation> capturedUpdate1 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate1));
+ // expecting that exception will be called before mutation is updated.
+ expectLastCall().andThrow(new UnsupportedOperationException()).anyTimes();
+
+ // create sample data "served" by the mocked scanner
+ TreeMap<Key,Value> scanData = new TreeMap<>();
+ Text row1 = new Text("123");
+
+ String fileName1 = "bad path";
+ Key key1 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+ Value value1 = new Value("123,456");
+ scanData.put(key1, value1);
+
+ String fileName2 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/B000000v.rf";
+ Key key2 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+ Value value2 = new Value("321,654");
+ scanData.put(key2, value2);
+
+ Scanner scanner = mock(Scanner.class);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ expectLastCall();
- replay(bw);
+ expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+ replay(batchWriter, scanner);
- assertThrows(IllegalStateException.class, () -> upgrader.removeChoppedCF(k, bw, "aTable"));
+ Upgrader11to12 upgrader = new Upgrader11to12();
+ assertThrows(IllegalArgumentException.class,
+ () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"));
- verify(bw);
+ verify(batchWriter, scanner);
}
@Test
- void removeExternalCompactionCFTest() throws Exception {
- Upgrader11to12 upgrader = new Upgrader11to12();
+ void unexpectedColFailsTest() throws Exception {
- Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
- .qualifier(new Text("ECID:1234")).build();
+ BatchWriter batchWriter = mock(BatchWriter.class);
+ Capture<Mutation> capturedUpdate1 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate1));
+ // expecting that exception will be called before mutation is updated.
+ expectLastCall().andThrow(new UnsupportedOperationException()).anyTimes();
- BatchWriter bw = createMock(BatchWriter.class);
- Capture<Mutation> captured = newCapture();
- bw.addMutation(capture(captured));
- expectLastCall();
+ // create sample data "served" by the mocked scanner
+ TreeMap<Key,Value> scanData = new TreeMap<>();
+ Text row1 = new Text("123");
- replay(bw);
+ Key key1 = Key.builder(false).row(row1).family(LastLocationColumnFamily.NAME).qualifier("srv1")
+ .build();
+ Value value1 = new Value("123,456");
+ scanData.put(key1, value1);
- upgrader.removeExternalCompactionCF(k, bw, "aTable");
+ Scanner scanner = mock(Scanner.class);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ expectLastCall();
- Mutation delete = new Mutation(k.getRow()).at().family(ExternalCompactionColumnFamily.NAME)
- .qualifier(new Text("ECID:1234")).delete();
+ expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+ replay(batchWriter, scanner);
- assertEquals(delete, captured.getValue());
+ Upgrader11to12 upgrader = new Upgrader11to12();
+ assertThrows(IllegalStateException.class,
+ () -> upgrader.processReferences(batchWriter, scanner, "accumulo.metadata"));
- for (ColumnUpdate update : captured.getValue().getUpdates()) {
- assertEquals(ExternalCompactionColumnFamily.STR_NAME,
- new String(update.getColumnFamily(), UTF_8));
- assertEquals("ECID:1234", new String(update.getColumnQualifier(), UTF_8));
- assertTrue(update.isDeleted());
- }
- verify(bw);
+ verify(batchWriter, scanner);
}
+ /**
+ * process 3 rows, 2 should result in no mutations and batch writer addMutation should not be
+ * called for those rows
+ */
@Test
- void removeExternalCompactionCFContinuesTest() throws Exception {
- Upgrader11to12 upgrader = new Upgrader11to12();
-
- Key k = Key.builder().row(new Text("12;")).family(ExternalCompactionColumnFamily.NAME)
- .qualifier(new Text("ECID:1234")).build();
+ public void verifyEmptyMutation() throws Exception {
+ BatchWriter batchWriter = mock(BatchWriter.class);
+ Capture<Mutation> capturedUpdate1 = newCapture();
+ batchWriter.addMutation(capture(capturedUpdate1));
+ expectLastCall().once();
+ // create sample data "served" by the mocked scanner
+ TreeMap<Key,Value> scanData = new TreeMap<>();
+
+ Text row1 = new Text("1");
+
+ String fileName1 = "hdfs://localhost:8020/accumulo/tables/12/default_tablet/1111000v.rf";
+ Key key1 =
+ Key.builder(false).row(row1).family(DataFileColumnFamily.NAME).qualifier(fileName1).build();
+ Value value1 = new Value("111,222");
+ scanData.put(key1, value1);
+
+ Text row2 = new Text("a");
+
+ // reference already in expected form with fence info.
+ String fileName2 =
+ "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/A000000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+ Key key2 =
+ Key.builder(false).row(row2).family(DataFileColumnFamily.NAME).qualifier(fileName2).build();
+ Value value2 = new Value("222,333");
+ scanData.put(key2, value2);
+
+ Text row3 = new Text("b");
+
+ // reference already in expected form with fence info.
+ String fileName3 =
+ "{\"path\":\"hdfs://localhost:8020/accumulo/tables/12/default_tablet/BBBB000v.rf\",\"startRow\":\"\",\"endRow\":\"\"}";
+ Key key3 =
+ Key.builder(false).row(row3).family(DataFileColumnFamily.NAME).qualifier(fileName3).build();
+ Value value3 = new Value("333,444");
+ scanData.put(key3, value3);
+
+ Scanner scanner = mock(Scanner.class);
+ scanner.fetchColumnFamily(DataFileColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ChoppedColumnFamily.NAME);
+ expectLastCall();
+ scanner.fetchColumnFamily(ExternalCompactionColumnFamily.NAME);
+ expectLastCall();
- BatchWriter bw = createMock(BatchWriter.class);
- Capture<Mutation> captured = newCapture();
- bw.addMutation(capture(captured));
- expectLastCall().andThrow(new MutationsRejectedException(null, List.of(), Map.of(), List.of(),
- 0, new NullPointerException()));
+ expect(scanner.iterator()).andReturn(scanData.entrySet().iterator()).once();
+ replay(batchWriter, scanner);
- replay(bw);
+ Upgrader11to12 upgrader = new Upgrader11to12();
+ upgrader.processReferences(batchWriter, scanner, "accumulo.metadata");
- assertThrows(IllegalStateException.class,
- () -> upgrader.removeExternalCompactionCF(k, bw, "aTable"));
+ LOG.info("c:{}", capturedUpdate1.getValue().prettyPrint());
+ var u1 = capturedUpdate1.getValue();
+ // 1 add, 1 delete
+ assertEquals(2, u1.getUpdates().size());
+ assertEquals(1, u1.getUpdates().stream().filter(ColumnUpdate::isDeleted).count());
- verify(bw);
+ verify(batchWriter, scanner);
}
@Test