You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/06/18 18:59:43 UTC

[accumulo-testing] branch master updated: Add new garbage collection simulation test (#81)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git


The following commit(s) were added to refs/heads/master by this push:
     new a8fe43e  Add new garbage collection simulation test (#81)
a8fe43e is described below

commit a8fe43ea77c2fc8457a2e0073e84a135940b5c6c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jun 18 14:59:39 2019 -0400

    Add new garbage collection simulation test (#81)
---
 README.md                                          |   4 +
 bin/gcs                                            |  71 +++++
 conf/accumulo-testing.properties.example           |  17 ++
 docs/gcs.md                                        |  71 +++++
 .../org/apache/accumulo/testing/gcs/Candidate.java |  73 +++++
 .../org/apache/accumulo/testing/gcs/Collector.java |  92 ++++++
 .../org/apache/accumulo/testing/gcs/GcsEnv.java    |  34 +++
 .../org/apache/accumulo/testing/gcs/Generator.java | 263 +++++++++++++++++
 .../org/apache/accumulo/testing/gcs/GroupRef.java  |  28 ++
 .../java/org/apache/accumulo/testing/gcs/Item.java |  82 ++++++
 .../org/apache/accumulo/testing/gcs/ItemRef.java   |  55 ++++
 .../org/apache/accumulo/testing/gcs/ItemState.java |   5 +
 .../org/apache/accumulo/testing/gcs/Mutator.java   |  22 ++
 .../apache/accumulo/testing/gcs/Persistence.java   | 326 +++++++++++++++++++++
 .../org/apache/accumulo/testing/gcs/Setup.java     |  24 ++
 .../org/apache/accumulo/testing/gcs/Verifier.java  |  69 +++++
 16 files changed, 1236 insertions(+)

diff --git a/README.md b/README.md
index a2eea4c..002cca5 100644
--- a/README.md
+++ b/README.md
@@ -140,6 +140,10 @@ than one instance of this MapReduce job concurrently against a table.
 Checkout [ingest-test.md](docs/ingest-test.md) for pointers on running a long
 running ingest and verification test.
 
+## Garbage Collection Simulator
+
+See [gcs.md](docs/gcs.md).
+
 ## Agitator
 
 The agitator will periodically kill the Accumulo master, tablet server, and Hadoop data node
diff --git a/bin/gcs b/bin/gcs
new file mode 100755
index 0000000..611dad6
--- /dev/null
+++ b/bin/gcs
@@ -0,0 +1,71 @@
+#! /usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
+
+function print_usage() {
+  cat <<EOF
+
+Usage: gcs <application> {-o test.<prop>=<value>}
+
+Available applications:
+
+    setup        Create and configure table.
+    generate     Run a data generator.  Can run multiple.
+    collect      Run a collector, should only run one.
+    verify       Verify data.  Stop collect and generate before running.
+EOF
+}
+
+if [ -f "$at_home/conf/env.sh" ]; then
+  . "$at_home"/conf/env.sh
+else
+  . "$at_home"/conf/env.sh.example
+fi
+
+if [ -z "$1" ]; then
+  echo "ERROR: <application> needs to be set"
+  print_usage
+  exit 1
+fi
+
+ci_package="org.apache.accumulo.testing.gcs"
+case "$1" in
+  generate)
+    ci_main="${ci_package}.Generator"
+    ;;
+  collect)
+    ci_main="${ci_package}.Collector"
+    ;;
+  verify)
+    ci_main="${ci_package}.Verifier"
+    ;;
+  setup)
+    ci_main="${ci_package}.Setup"
+    ;;
+
+  *)
+    echo "Unknown application: $1"
+    print_usage
+    exit 1
+esac
+
+export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
+
+java -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
+
diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index 04801b1..760e82f 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -119,6 +119,23 @@ test.ci.bulk.map.nodes=1000000
 # produce a bulk import file.
 test.ci.bulk.reducers.max=1024
 
+###############################
+# Garbage Collection Simulation
+###############################
+
+# Name of Accumulo table to use for test
+test.gcs.table=gcs
+# Max number of buckets for references.  Buckets correspond to tablets in the Accumulo GC.
+test.gcs.maxBuckets=100000
+# Split each data section with this many tablets when creating table.
+test.gcs.tablets=10
+# Total number of work list the generator should create before it exits.
+test.gcs.maxWork=100000000
+# Max number of work list a generator should be concurrently working on.
+test.gcs.maxActiveWork=10000
+# Number of entries collector and verifier will read into memory.
+test.gcs.batchSize=100000
+
 #################
 # MapReduce Tests
 #################
diff --git a/docs/gcs.md b/docs/gcs.md
new file mode 100644
index 0000000..6c46943
--- /dev/null
+++ b/docs/gcs.md
@@ -0,0 +1,71 @@
+# Garbage Collection Simulation (GCS)
+
+GCS is a test suite that generates random data in a way that is similar to the
+Accumulo garbage collector.  This test has a few interesting properties.  First
+it generates data at a much higher rate than the garbage collector would on a
+small system, simulating a much larger system.  Second, it has a much more
+complex read and write pattern than continuous ingest that involve multiple
+processes writing, reading, and deleting data.  Third, the random data is
+verifiable like continuous ingest.  At any point the test can be stopped and
+the data verified.  This test will not generate as much data as continuous
+ingest.  The test will reach a steady state in terms of the number of entries
+stored in Accumulo.  The size of this steady state is determined by the number
+of generators running and the setting `test.gcs.maxActiveWork`, increasing
+either will increase the steady state size.
+
+## Data Types
+
+This test has the following types of data that are stored in a single accumulo table.
+
+ * **Item** : An item is something that should be deleted, unless it is referenced.
+   Each item is part of a group.  Items correspond to files and groups
+   correspond to bulk imports, in the Accumulo GC.
+ * **Item reference** : A reference to an item that should prevent it from
+   being deleted.  An item can have multiple item references.
+ * **Group reference** : A reference to a group that should prevent the
+   deletion of any items in a group.  This corresponds to blip markers in the
+   Accumulo GC.
+ * **Deletion candidate** : An entry that signifies an item is a candidate for deletion.
+
+## Invariants
+
+Hopefully the test data never violates the following rules
+
+ * An Item should always be referenced by an Item reference, group reference or
+   a deletion candidate.  There is one exception to this, items with a value of
+  `NEW`.  Its ok for new items to be unreferenced.
+ * An Item reference should always have a corresponding item.
+
+## Executable components
+
+The test has the following executable components.
+
+ * **setup** : creates and configures table
+ * **generator** : continually generates items, references, and candidates.
+   These are generated randomly and spaced out over time, interleaving
+   unrelated entries. The generator should never create data that violates the
+   test invariants.  Multiple generators can be run concurrently.
+ * **collector** : continually scans the data looking for unreferenced
+   candidates to delete.  Should only run one at a time.
+ * **verifier** :  This processes checks the table to ensure the test
+   invariants have not been violated.  Before running this, the generator and
+   collector processes should be stopped.
+
+Running `./bin/gcs` will print help that shows how to run these processes.
+
+Below is simple script that runs a test scenario.
+
+```bash
+./bin/gcs setup
+
+for i in $(seq 1 10); do
+  ./bin/gcs generate &
+done
+
+./bin/gcs collect &
+
+sleep 12h
+
+pkill -f gcs
+./bin/gcs verify
+```
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Candidate.java b/src/main/java/org/apache/accumulo/testing/gcs/Candidate.java
new file mode 100644
index 0000000..068e78e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Candidate.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import java.util.Comparator;
+
+import com.google.common.base.Objects;
+
+public class Candidate implements Comparable<Candidate> {
+  public final long clientId;
+  public final long groupId;
+  public final long itemId;
+
+  Candidate(long clientId, long groupId, long itemId) {
+    this.clientId = clientId;
+    this.groupId = groupId;
+    this.itemId = itemId;
+  }
+
+  public long getClientId() {
+    return clientId;
+  }
+
+  public long getGroupId() {
+    return groupId;
+  }
+
+  public long getItemId() {
+    return itemId;
+  }
+
+  private static Comparator<Candidate> COMPARATOR = Comparator.comparingLong(Candidate::getClientId)
+      .thenComparingLong(Candidate::getGroupId).thenComparingLong(Candidate::getItemId);
+
+  @Override
+  public int compareTo(Candidate o) {
+    return COMPARATOR.compare(this, o);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(clientId, groupId, itemId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Candidate) {
+      Item oi = (Item) o;
+      return clientId == oi.clientId && groupId == oi.groupId && itemId == oi.itemId;
+    }
+
+    return false;
+  }
+
+  public Item item() {
+    return new Item(clientId, groupId, itemId);
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Collector.java b/src/main/java/org/apache/accumulo/testing/gcs/Collector.java
new file mode 100644
index 0000000..628f286
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Collector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.gcs;
+
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+public class Collector {
+
+  Persistence persistence;
+  private int batchSize;
+
+  public Collector(GcsEnv gcsEnv) {
+    this.persistence = new Persistence(gcsEnv);
+    this.batchSize = gcsEnv.getBatchSize();
+  }
+
+  public static void main(String[] args) throws Exception {
+    new Collector(new GcsEnv(args)).run();
+  }
+
+  public static <T> void forEachBatch(Iterable<T> iter, int batchSize,
+      Consumer<TreeSet<T>> batchProcessor) {
+    TreeSet<T> batch = new TreeSet<>();
+
+    for (T element : iter) {
+      if (batch.size() >= batchSize) {
+        batchProcessor.accept(batch);
+        batch.clear();
+      }
+      batch.add(element);
+    }
+
+    if (!batch.isEmpty()) {
+      batchProcessor.accept(batch);
+    }
+  }
+
+  public void run() throws Exception {
+    Random rand = new Random();
+
+    while (true) {
+      forEachBatch(persistence.candidates(), batchSize, batch -> collect(batch));
+      Thread.sleep(13000);
+
+      if (rand.nextInt(10) == 0) {
+        persistence.flushTable();
+      }
+    }
+  }
+
+  private void collect(TreeSet<Candidate> candidates) {
+
+    int initialSize = candidates.size();
+
+    for (GroupRef gr : persistence.groupRefs()) {
+      candidates.subSet(new Candidate(gr.clientId, gr.groupId, 0),
+          new Candidate(gr.clientId, gr.groupId + 1, 0)).clear();
+    }
+
+    for (ItemRef ir : persistence.itemRefs()) {
+      candidates.remove(new Candidate(ir.clientId, ir.groupId, ir.itemId));
+    }
+
+    System.out.println("Deleting " + candidates.size() + " of " + initialSize);
+
+    for (Candidate c : candidates) {
+      persistence.delete(c.item());
+    }
+    persistence.flush();
+
+    for (Candidate c : candidates) {
+      persistence.delete(c);
+    }
+    persistence.flush();
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/GcsEnv.java b/src/main/java/org/apache/accumulo/testing/gcs/GcsEnv.java
new file mode 100644
index 0000000..b39727b
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/GcsEnv.java
@@ -0,0 +1,34 @@
+package org.apache.accumulo.testing.gcs;
+
+import org.apache.accumulo.testing.TestEnv;
+
+public class GcsEnv extends TestEnv {
+
+  public GcsEnv(String[] args) {
+    super(args);
+  }
+
+  public String getTableName() {
+    return testProps.getProperty("test.gcs.table", "gcs");
+  }
+
+  public int getMaxBuckets() {
+    return Integer.parseInt(testProps.getProperty("test.gcs.maxBuckets", "100000"));
+  }
+
+  public int getInitialTablets() {
+    return Integer.parseInt(testProps.getProperty("test.gcs.tablets", "10"));
+  }
+
+  public int getMaxWork() {
+    return Integer.parseInt(testProps.getProperty("test.gcs.maxWork", "100000000"));
+  }
+
+  public int getMaxActiveWork() {
+    return Integer.parseInt(testProps.getProperty("test.gcs.maxActiveWork", "10000"));
+  }
+
+  public int getBatchSize() {
+    return Integer.parseInt(testProps.getProperty("test.gcs.batchSize", "100000"));
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Generator.java b/src/main/java/org/apache/accumulo/testing/gcs/Generator.java
new file mode 100644
index 0000000..e5c6a12
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Generator.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+
+public class Generator {
+
+  private long clientId;
+  private long nextGroupId;
+  private long nextItemId;
+
+  private final int maxBuckets;
+
+  // The total number of work chains to execute
+  private final int maxWork;
+
+  // The max number of work chains that should be active at any one time.
+  private final int maxActiveWork;
+
+  Random rand = new Random();
+
+  private Persistence persistence;
+
+  public Generator(GcsEnv gcsEnv) {
+    this.persistence = new Persistence(gcsEnv);
+    this.maxBuckets = gcsEnv.getMaxBuckets();
+
+    this.maxWork = gcsEnv.getMaxWork();
+    this.maxActiveWork = gcsEnv.getMaxActiveWork();
+  }
+
+  private void run() {
+    // This holds future work to do.
+    List<Queue<Mutator>> allActions = new ArrayList<>();
+
+    int workCreated = 0;
+
+    clientId = Math.abs(rand.nextLong());
+
+    while (workCreated == 0 || !allActions.isEmpty()) {
+      while (workCreated < maxWork && allActions.size() < maxActiveWork) {
+        allActions.add(createWork());
+        workCreated++;
+      }
+
+      int index = rand.nextInt(allActions.size());
+
+      Queue<Mutator> queue = allActions.get(index);
+
+      int numToRun = Math.max(1, rand.nextInt(queue.size()));
+
+      // By selecting a random queue of work do to do and taking a random number of steps off the
+      // queue we are randomly interleaving unrelated work over time.
+      for (int i = 0; i < numToRun; i++) {
+        queue.remove().run(persistence);
+      }
+
+      if (queue.isEmpty()) {
+        allActions.set(index, allActions.get(allActions.size() - 1));
+        allActions.remove(allActions.size() - 1);
+      }
+    }
+  }
+
+  private Queue<Mutator> createWork() {
+
+    switch (rand.nextInt(2)) {
+      case 0:
+        return createGroupWork();
+      case 1:
+        return createSimpleWork();
+      default:
+        throw new IllegalStateException();
+    }
+
+  }
+
+  private Queue<Mutator> createSimpleWork() {
+    long groupId = nextGroupId++;
+    int bucket = rand.nextInt(maxBuckets);
+
+    ArrayDeque<Mutator> work = new ArrayDeque<>();
+
+    List<Item> newItems = new ArrayList<>();
+    int numItems = rand.nextInt(10) + 1;
+    for (int i = 0; i < numItems; i++) {
+      newItems.add(new Item(clientId, groupId, nextItemId++));
+    }
+
+    // copy because lambda will execute later
+    List<Item> itemsToAdd = new ArrayList<>(newItems);
+    work.add(p -> {
+      for (Item item : itemsToAdd) {
+        p.save(item, ItemState.NEW);
+      }
+      p.flush();
+    });
+
+    List<ItemRef> referencedItems = new ArrayList<>();
+
+    while (!newItems.isEmpty() || !referencedItems.isEmpty()) {
+      if (newItems.isEmpty()) {
+        int size = referencedItems.size();
+        List<ItemRef> subList = referencedItems.subList(rand.nextInt(size), size);
+        List<ItemRef> refsToDelete = new ArrayList<>(subList);
+        subList.clear();
+
+        work.add(p -> {
+          for (ItemRef ir : refsToDelete) {
+            p.save(new Candidate(ir.clientId, ir.groupId, ir.itemId));
+          }
+          p.flush();
+        });
+
+        work.add(p -> {
+          p.delete(refsToDelete);
+          p.flush();
+        });
+      } else if (referencedItems.isEmpty()) {
+        int size = newItems.size();
+        List<Item> subList = newItems.subList(rand.nextInt(size), size);
+        List<Item> itemsToRef = new ArrayList<>(subList);
+        subList.clear();
+
+        List<ItemRef> refsToAdd = new ArrayList<>();
+        itemsToRef.forEach(item -> refsToAdd.add(new ItemRef(bucket, item)));
+        referencedItems.addAll(refsToAdd);
+
+        work.add(p -> {
+          p.save(refsToAdd);
+          p.flush();
+        });
+
+        work.add(p -> {
+          for (Item item : itemsToRef) {
+            p.save(item, ItemState.REFERENCED);
+          }
+          p.flush();
+        });
+
+      } else {
+        int size = referencedItems.size();
+        List<ItemRef> subList = referencedItems.subList(rand.nextInt(size), size);
+        List<ItemRef> refsToDelete = new ArrayList<>(subList);
+        subList.clear();
+
+        Item itemToRef = newItems.remove(newItems.size() - 1);
+        referencedItems.add(new ItemRef(bucket, itemToRef));
+
+        work.add(p -> {
+          for (ItemRef ir : refsToDelete) {
+            p.save(new Candidate(ir.clientId, ir.groupId, ir.itemId));
+          }
+          p.flush();
+        });
+
+        work.add(p -> {
+          p.replace(refsToDelete, new ItemRef(bucket, itemToRef));
+          p.flush();
+        });
+
+        work.add(p -> {
+          p.save(itemToRef, ItemState.REFERENCED);
+          p.flush();
+        });
+      }
+    }
+
+    return work;
+  }
+
+  private Queue<Mutator> createGroupWork() {
+    long groupId = nextGroupId++;
+
+    long items[] = new long[rand.nextInt(10) + 1];
+    for (int i = 0; i < items.length; i++) {
+      items[i] = nextItemId++;
+    }
+
+    ArrayDeque<Mutator> work = new ArrayDeque<>();
+
+    work.add(p -> {
+      p.save(new GroupRef(clientId, groupId));
+      p.flush();
+    });
+
+    work.add(p -> {
+      for (long itemId : items) {
+        p.save(new Item(clientId, groupId, itemId), ItemState.REFERENCED);
+      }
+      p.flush();
+    });
+
+    List<ItemRef> refsToAdd = new ArrayList<>();
+    List<ItemRef> refsToDel = new ArrayList<>();
+
+    for (long itemId : items) {
+      for (int i = 0; i < rand.nextInt(3) + 1; i++) {
+        int bucket = rand.nextInt(maxBuckets);
+        refsToAdd.add(new ItemRef(bucket, clientId, groupId, itemId));
+      }
+    }
+
+    Collections.shuffle(refsToAdd, rand);
+
+    boolean deletedGroupRef = false;
+
+    while (!refsToAdd.isEmpty() || !refsToDel.isEmpty() || !deletedGroupRef) {
+      if (!refsToAdd.isEmpty() && rand.nextBoolean()) {
+        ItemRef ref = refsToAdd.remove(refsToAdd.size() - 1);
+        refsToDel.add(ref);
+        work.add(p -> {
+          p.save(ref);
+          p.flush();
+        });
+      }
+
+      if (refsToAdd.isEmpty() && !deletedGroupRef && rand.nextBoolean()) {
+        work.add(p -> {
+          p.delete(new GroupRef(clientId, groupId));
+          p.flush();
+        });
+        deletedGroupRef = true;
+      }
+
+      if (!refsToDel.isEmpty() && rand.nextBoolean()) {
+        ItemRef ref = refsToDel.remove(refsToDel.size() - 1);
+        work.add(p -> {
+          p.save(new Candidate(clientId, groupId, ref.itemId));
+          p.delete(ref);
+          p.flush();
+        });
+      }
+    }
+
+    return work;
+  }
+
+  public static void main(String[] args) {
+    new Generator(new GcsEnv(args)).run();
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/GroupRef.java b/src/main/java/org/apache/accumulo/testing/gcs/GroupRef.java
new file mode 100644
index 0000000..46c5b14
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/GroupRef.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+public class GroupRef {
+  public final long clientId;
+  public final long groupId;
+
+  GroupRef(long clientId, long groupId) {
+    this.clientId = clientId;
+    this.groupId = groupId;
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Item.java b/src/main/java/org/apache/accumulo/testing/gcs/Item.java
new file mode 100644
index 0000000..14e79d5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Item.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import static org.apache.accumulo.testing.gcs.Persistence.toHex;
+
+import java.util.Comparator;
+
+import com.google.common.base.Objects;
+
+public class Item implements Comparable<Item> {
+
+  public final long clientId;
+  public final long groupId;
+  public final long itemId;
+
+  Item(long clientId, long groupId, long itemId) {
+    this.clientId = clientId;
+    this.groupId = groupId;
+    this.itemId = itemId;
+  }
+
+  Item(Candidate c) {
+    this(c.clientId, c.groupId, c.itemId);
+  }
+
+  public long getClientId() {
+    return clientId;
+  }
+
+  public long getGroupId() {
+    return groupId;
+  }
+
+  public long getItemId() {
+    return itemId;
+  }
+
+  private static Comparator<Item> COMPARATOR = Comparator.comparingLong(Item::getClientId)
+      .thenComparingLong(Item::getGroupId).thenComparingLong(Item::getItemId);
+
+  @Override
+  public int compareTo(Item o) {
+    return COMPARATOR.compare(this, o);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(clientId, groupId, itemId);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Item) {
+      Item oi = (Item) o;
+      return clientId == oi.clientId && groupId == oi.groupId && itemId == oi.itemId;
+    }
+
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "clientId:" + toHex(clientId) + "  groupId:" + toHex(groupId) + "  itemId:"
+        + toHex(itemId);
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/ItemRef.java b/src/main/java/org/apache/accumulo/testing/gcs/ItemRef.java
new file mode 100644
index 0000000..9a324af
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/ItemRef.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+public class ItemRef implements Comparable<ItemRef> {
+  public final int bucket;
+  public final long clientId;
+  public final long groupId;
+  public final long itemId;
+
+  ItemRef(int bucket, long clientId, long groupId, long itemId) {
+    this.bucket = bucket;
+    this.clientId = clientId;
+    this.groupId = groupId;
+    this.itemId = itemId;
+  }
+
+  ItemRef(int bucket, Item item) {
+    this(bucket, item.clientId, item.groupId, item.itemId);
+  }
+
+  public Item item() {
+    return new Item(clientId, groupId, itemId);
+  }
+
+  @Override
+  public int compareTo(ItemRef o) {
+    int cmp = Long.compare(bucket, o.bucket);
+    if (cmp == 0) {
+      cmp = item().compareTo(o.item());
+    }
+
+    return cmp;
+  }
+
+  @Override
+  public String toString() {
+    return "bucket:" + Persistence.toHex(bucket) + " " + item();
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/ItemState.java b/src/main/java/org/apache/accumulo/testing/gcs/ItemState.java
new file mode 100644
index 0000000..c418556
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/ItemState.java
@@ -0,0 +1,5 @@
+package org.apache.accumulo.testing.gcs;
+
+public enum ItemState {
+  NEW, REFERENCED
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Mutator.java b/src/main/java/org/apache/accumulo/testing/gcs/Mutator.java
new file mode 100644
index 0000000..d4cb3e3
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Mutator.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+public interface Mutator {
+  public void run(Persistence p);
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Persistence.java b/src/main/java/org/apache/accumulo/testing/gcs/Persistence.java
new file mode 100644
index 0000000..7638c6f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Persistence.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.hash.Hashing;
+
+public class Persistence {
+
+  private BatchWriter writer;
+  private String table;
+  private AccumuloClient client;
+
+  Persistence(GcsEnv env) {
+    this.client = env.getAccumuloClient();
+    this.table = env.getTableName();
+    try {
+      this.writer = client.createBatchWriter(table);
+    } catch (TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  static String toHex(int i) {
+    return Strings.padStart(Integer.toHexString(i), 8, '0');
+  }
+
+  static String toHex(long l) {
+    return Strings.padStart(Long.toHexString(l), 16, '0');
+  }
+
+  private void write(Mutation m) {
+    try {
+      writer.addMutation(m);
+    } catch (MutationsRejectedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private String toHex(long l1, long l2, long l3) {
+    return toHex(l1) + ":" + toHex(l2) + ":" + toHex(l3);
+  }
+
+  private String toHexWithHash(long l1, long l2) {
+    int hc = Hashing.murmur3_32().newHasher().putLong(l1).putLong(l2).hash().asInt();
+    return toHex(hc) + ":" + toHex(l1) + ":" + toHex(l2);
+  }
+
+  private String toHexWithHash(long l1, long l2, long l3) {
+    int hc = Hashing.murmur3_32().newHasher().putLong(l1).putLong(l2).putLong(l3).hash().asInt();
+    return toHex(hc) + ":" + toHex(l1) + ":" + toHex(l2) + ":" + toHex(l3);
+  }
+
+  void save(Item item, ItemState state) {
+    Mutation m = new Mutation("I:" + toHexWithHash(item.clientId, item.groupId));
+    m.put("item", toHex(item.itemId), state.name());
+    write(m);
+  }
+
+  void save(ItemRef itemRef) {
+    Mutation m = new Mutation("R:" + toHex(itemRef.bucket));
+    m.put("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId), "");
+    write(m);
+  }
+
+  public void save(Collection<ItemRef> refsToAdd) {
+    Map<Integer,Mutation> mutations = new HashMap<>();
+
+    for (ItemRef itemRef : refsToAdd) {
+      Mutation m = mutations.computeIfAbsent(itemRef.bucket,
+          bucket -> new Mutation("R:" + toHex(bucket)));
+      m.put("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId), "");
+    }
+
+    mutations.values().forEach(m -> write(m));
+
+  }
+
+  public void replace(Collection<ItemRef> refsToDelete, ItemRef refToAdd) {
+    Mutation m = new Mutation("R:" + toHex(refToAdd.bucket));
+    m.put("ref", toHex(refToAdd.clientId, refToAdd.groupId, refToAdd.itemId), "");
+
+    for (ItemRef ir : refsToDelete) {
+      Preconditions.checkArgument(refToAdd.bucket == ir.bucket);
+      m.putDelete("ref", toHex(ir.clientId, ir.groupId, ir.itemId));
+    }
+
+    write(m);
+  }
+
+  void save(GroupRef groupRef) {
+    Mutation m = new Mutation("G:" + toHexWithHash(groupRef.clientId, groupRef.groupId));
+    m.put("", "", "");
+    write(m);
+  }
+
+  void save(Candidate c) {
+    Mutation m = new Mutation("C:" + toHexWithHash(c.clientId, c.groupId, c.itemId));
+    m.put("", "", "");
+    write(m);
+  }
+
+  public void flush() {
+    try {
+      writer.flush();
+    } catch (MutationsRejectedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  public void delete(ItemRef itemRef) {
+    Mutation m = new Mutation("R:" + toHex(itemRef.bucket));
+    m.putDelete("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId));
+    write(m);
+  }
+
+  public void delete(Collection<ItemRef> refsToDelete) {
+    Map<Integer,Mutation> mutations = new HashMap<>();
+
+    for (ItemRef itemRef : refsToDelete) {
+      Mutation m = mutations.computeIfAbsent(itemRef.bucket,
+          bucket -> new Mutation("R:" + toHex(bucket)));
+      m.putDelete("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId));
+    }
+
+    mutations.values().forEach(m -> write(m));
+  }
+
+  public void delete(GroupRef groupRef) {
+    Mutation m = new Mutation("G:" + toHexWithHash(groupRef.clientId, groupRef.groupId));
+    m.putDelete("", "");
+    write(m);
+  }
+
+  public void delete(Item item) {
+    Mutation m = new Mutation("I:" + toHexWithHash(item.clientId, item.groupId));
+    m.putDelete("item", toHex(item.itemId));
+    write(m);
+  }
+
+  public void delete(Candidate c) {
+    Mutation m = new Mutation("C:" + toHexWithHash(c.clientId, c.groupId, c.itemId));
+    m.putDelete("", "");
+    write(m);
+  }
+
+  private Scanner createScanner(String prefix) {
+    Scanner scanner;
+    try {
+      scanner = new IsolatedScanner(client.createScanner(table));
+    } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
+      throw new RuntimeException(e);
+    }
+    scanner.setRange(Range.prefix(prefix));
+
+    return scanner;
+  }
+
+  private <T> Iterable<T> transformRange(String prefix, Function<Key,T> func) {
+    return Iterables.transform(createScanner(prefix), entry -> func.apply(entry.getKey()));
+  }
+
+  Iterable<Candidate> candidates() {
+    return transformRange("C:", k -> {
+      String row = k.getRowData().toString();
+      String[] fields = row.substring(11).split(":");
+
+      Preconditions.checkState(fields.length == 3, "Bad candidate row %s", row);
+
+      long clientId = Long.parseLong(fields[0], 16);
+      long groupId = Long.parseLong(fields[1], 16);
+      long itemId = Long.parseLong(fields[2], 16);
+
+      return new Candidate(clientId, groupId, itemId);
+    });
+  }
+
+  Iterable<ItemRef> itemRefs() {
+    return transformRange("R:", k -> {
+      String row = k.getRowData().toString();
+      String qual = k.getColumnQualifierData().toString();
+
+      int bucket = Integer.parseInt(row.substring(2), 16);
+
+      String[] fields = qual.split(":");
+
+      Preconditions.checkState(fields.length == 3, "Bad item ref %s", k);
+
+      long clientId = Long.parseLong(fields[0], 16);
+      long groupId = Long.parseLong(fields[1], 16);
+      long itemId = Long.parseLong(fields[2], 16);
+
+      return new ItemRef(bucket, clientId, groupId, itemId);
+    });
+  }
+
+  Iterable<GroupRef> groupRefs() {
+    return transformRange("G:", k -> {
+      String row = k.getRowData().toString();
+      String[] fields = row.substring(11).split(":");
+
+      Preconditions.checkState(fields.length == 2, "Bad group ref row %s", row);
+
+      long clientId = Long.parseLong(fields[0], 16);
+      long groupId = Long.parseLong(fields[1], 16);
+
+      return new GroupRef(clientId, groupId);
+    });
+  }
+
+  Iterable<Item> items(ItemState... states) {
+    EnumSet<ItemState> stateSet = EnumSet.of(states[0]);
+    for (int i = 1; i < states.length; i++) {
+      stateSet.add(states[i]);
+    }
+
+    Iterable<Entry<Key,Value>> itemIter = Iterables.filter(createScanner("I:"),
+        entry -> stateSet.contains(ItemState.valueOf(entry.getValue().toString())));
+
+    return Iterables.transform(itemIter, entry -> {
+      Key k = entry.getKey();
+      String row = k.getRowData().toString();
+      String qual = k.getColumnQualifierData().toString();
+
+      String[] fields = row.substring(11).split(":");
+
+      Preconditions.checkState(fields.length == 2, "Bad group ref row %s", row);
+
+      long clientId = Long.parseLong(fields[0], 16);
+      long groupId = Long.parseLong(fields[1], 16);
+      long itemId = Long.parseLong(qual, 16);
+
+      return new Item(clientId, groupId, itemId);
+    });
+  }
+
+  private static TreeSet<Text> initialSplits(GcsEnv env) {
+    TreeSet<Text> splits = new TreeSet<>();
+
+    int tabletsPerSection = env.getInitialTablets();
+
+    for (String prefix : new String[] {"G:", "C:", "I:", "R:"}) {
+      int numSplits = tabletsPerSection - 1;
+
+      long max;
+      if (prefix.equals("R:")) {
+        max = env.getMaxBuckets();
+      } else {
+        max = 1L << 33 - 1;
+      }
+
+      long distance = (max / tabletsPerSection) + 1;
+      long split = distance;
+
+      for (int i = 0; i < numSplits; i++) {
+        String s = String.format("%08x", split);
+        while (s.charAt(s.length() - 1) == '0') {
+          s = s.substring(0, s.length() - 1);
+        }
+        splits.add(new Text(prefix + s));
+        split += distance;
+      }
+
+      splits.add(new Text(prefix + "~"));
+    }
+
+    return splits;
+  }
+
+  public static void init(GcsEnv env) throws Exception {
+    NewTableConfiguration ntc = new NewTableConfiguration();
+    ntc.withSplits(initialSplits(env));
+    ntc.setProperties(ImmutableMap.of("table.compaction.major.ratio", "1"));
+    env.getAccumuloClient().tableOperations().create(env.getTableName(), ntc);
+  }
+
+  public void flushTable() {
+    try {
+      client.tableOperations().flush(table);
+    } catch (AccumuloException | AccumuloSecurityException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Setup.java b/src/main/java/org/apache/accumulo/testing/gcs/Setup.java
new file mode 100644
index 0000000..31c9647
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Setup.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.gcs;
+
+public class Setup {
+  public static void main(String[] args) throws Exception {
+    Persistence.init(new GcsEnv(args));
+  }
+
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Verifier.java b/src/main/java/org/apache/accumulo/testing/gcs/Verifier.java
new file mode 100644
index 0000000..285fa2c
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Verifier.java
@@ -0,0 +1,69 @@
+package org.apache.accumulo.testing.gcs;
+
+import static org.apache.accumulo.testing.gcs.Collector.forEachBatch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+public class Verifier {
+
+  Persistence persistence;
+  private int batchSize;
+
+  public Verifier(GcsEnv gcsEnv) {
+    this.persistence = new Persistence(gcsEnv);
+    this.batchSize = gcsEnv.getBatchSize();
+  }
+
+  public static void main(String[] args) {
+    new Verifier(new GcsEnv(args)).run();
+  }
+
+  private void run() {
+    forEachBatch(persistence.items(ItemState.REFERENCED), batchSize, batch -> checkItems(batch));
+    forEachBatch(persistence.itemRefs(), batchSize, batch -> checkItemRefs(batch));
+  }
+
+  // Ensure there is an item for each item ref
+  private void checkItemRefs(TreeSet<ItemRef> itemRefs) {
+    Map<Item,List<ItemRef>> refMap = new HashMap<>();
+
+    itemRefs.forEach(ir -> {
+      refMap.computeIfAbsent(ir.item(), i -> new ArrayList<>()).add(ir);
+    });
+
+    persistence.items(ItemState.NEW, ItemState.REFERENCED).forEach(item -> refMap.remove(item));
+
+    if (refMap.size() > 0) {
+      System.err.println("References without items : ");
+      refMap.values().stream().flatMap(List::stream).forEach(ir -> System.err.println("\t" + ir));
+    }
+
+    System.out.printf("Checked %,d item refs\n", itemRefs.size());
+  }
+
+  // Ensure all items are referenced by something.
+  private void checkItems(TreeSet<Item> items) {
+
+    int initialSize = items.size();
+
+    persistence.itemRefs().forEach(ir -> items.remove(ir.item()));
+
+    persistence.candidates().forEach(c -> items.remove(new Item(c)));
+
+    persistence.groupRefs()
+        .forEach(gr -> items
+            .subSet(new Item(gr.clientId, gr.groupId, 0), new Item(gr.clientId, gr.groupId + 1, 0))
+            .clear());
+
+    if (items.size() > 0) {
+      System.err.println("Unreferenced items : ");
+      items.forEach(i -> System.err.println("\t" + i));
+    }
+
+    System.out.printf("Checked %,d items\n", initialSize);
+  }
+}