You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/06/18 18:59:43 UTC
[accumulo-testing] branch master updated: Add new garbage
collection simulation test (#81)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/master by this push:
new a8fe43e Add new garbage collection simulation test (#81)
a8fe43e is described below
commit a8fe43ea77c2fc8457a2e0073e84a135940b5c6c
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jun 18 14:59:39 2019 -0400
Add new garbage collection simulation test (#81)
---
README.md | 4 +
bin/gcs | 71 +++++
conf/accumulo-testing.properties.example | 17 ++
docs/gcs.md | 71 +++++
.../org/apache/accumulo/testing/gcs/Candidate.java | 73 +++++
.../org/apache/accumulo/testing/gcs/Collector.java | 92 ++++++
.../org/apache/accumulo/testing/gcs/GcsEnv.java | 34 +++
.../org/apache/accumulo/testing/gcs/Generator.java | 263 +++++++++++++++++
.../org/apache/accumulo/testing/gcs/GroupRef.java | 28 ++
.../java/org/apache/accumulo/testing/gcs/Item.java | 82 ++++++
.../org/apache/accumulo/testing/gcs/ItemRef.java | 55 ++++
.../org/apache/accumulo/testing/gcs/ItemState.java | 5 +
.../org/apache/accumulo/testing/gcs/Mutator.java | 22 ++
.../apache/accumulo/testing/gcs/Persistence.java | 326 +++++++++++++++++++++
.../org/apache/accumulo/testing/gcs/Setup.java | 24 ++
.../org/apache/accumulo/testing/gcs/Verifier.java | 69 +++++
16 files changed, 1236 insertions(+)
diff --git a/README.md b/README.md
index a2eea4c..002cca5 100644
--- a/README.md
+++ b/README.md
@@ -140,6 +140,10 @@ than one instance of this MapReduce job concurrently against a table.
Checkout [ingest-test.md](docs/ingest-test.md) for pointers on running a long
running ingest and verification test.
+## Garbage Collection Simulator
+
+See [gcs.md](docs/gcs.md).
+
## Agitator
The agitator will periodically kill the Accumulo master, tablet server, and Hadoop data node
diff --git a/bin/gcs b/bin/gcs
new file mode 100755
index 0000000..611dad6
--- /dev/null
+++ b/bin/gcs
@@ -0,0 +1,71 @@
+#! /usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+bin_dir=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
+at_home=$( cd "$( dirname "$bin_dir" )" && pwd )
+
+function print_usage() {
+ cat <<EOF
+
+Usage: gcs <application> {-o test.<prop>=<value>}
+
+Available applications:
+
+ setup Create and configure table.
+ generate Run a data generator. Can run multiple.
+ collect Run a collector, should only run one.
+ verify Verify data. Stop collect and generate before running.
+EOF
+}
+
+if [ -f "$at_home/conf/env.sh" ]; then
+ . "$at_home"/conf/env.sh
+else
+ . "$at_home"/conf/env.sh.example
+fi
+
+if [ -z "$1" ]; then
+ echo "ERROR: <application> needs to be set"
+ print_usage
+ exit 1
+fi
+
+ci_package="org.apache.accumulo.testing.gcs"
+case "$1" in
+ generate)
+ ci_main="${ci_package}.Generator"
+ ;;
+ collect)
+ ci_main="${ci_package}.Collector"
+ ;;
+ verify)
+ ci_main="${ci_package}.Verifier"
+ ;;
+ setup)
+ ci_main="${ci_package}.Setup"
+ ;;
+
+ *)
+ echo "Unknown application: $1"
+ print_usage
+ exit 1
+esac
+
+export CLASSPATH="$TEST_JAR_PATH:$HADOOP_API_JAR:$HADOOP_RUNTIME_JAR:$CLASSPATH"
+
+java -Dlog4j.configuration="file:$TEST_LOG4J" "$ci_main" "${@:2}" "$TEST_PROPS" "$ACCUMULO_CLIENT_PROPS"
+
diff --git a/conf/accumulo-testing.properties.example b/conf/accumulo-testing.properties.example
index 04801b1..760e82f 100644
--- a/conf/accumulo-testing.properties.example
+++ b/conf/accumulo-testing.properties.example
@@ -119,6 +119,23 @@ test.ci.bulk.map.nodes=1000000
# produce a bulk import file.
test.ci.bulk.reducers.max=1024
+###############################
+# Garbage Collection Simulation
+###############################
+
+# Name of Accumulo table to use for test
+test.gcs.table=gcs
+# Max number of buckets for references. Buckets correspond to tablets in the Accumulo GC.
+test.gcs.maxBuckets=100000
+# Split each data section with this many tablets when creating table.
+test.gcs.tablets=10
+# Total number of work list the generator should create before it exits.
+test.gcs.maxWork=100000000
+# Max number of work list a generator should be concurrently working on.
+test.gcs.maxActiveWork=10000
+# Number of entries collector and verifier will read into memory.
+test.gcs.batchSize=100000
+
#################
# MapReduce Tests
#################
diff --git a/docs/gcs.md b/docs/gcs.md
new file mode 100644
index 0000000..6c46943
--- /dev/null
+++ b/docs/gcs.md
@@ -0,0 +1,71 @@
+# Garbage Collection Simulation (GCS)
+
+GCS is a test suite that generates random data in a way that is similar to the
+Accumulo garbage collector. This test has a few interesting properties. First
+it generates data at a much higher rate than the garbage collector would on a
+small system, simulating a much larger system. Second, it has a much more
+complex read and write pattern than continuous ingest that involve multiple
+processes writing, reading, and deleting data. Third, the random data is
+verifiable like continuous ingest. At any point the test can be stopped and
+the data verified. This test will not generate as much data as continuous
+ingest. The test will reach a steady state in terms of the number of entries
+stored in Accumulo. The size of this steady state is determined by the number
+of generators running and the setting `test.gcs.maxActiveWork`, increasing
+either will increase the steady state size.
+
+## Data Types
+
+This test has the following types of data that are stored in a single accumulo table.
+
+ * **Item** : An item is something that should be deleted, unless it is referenced.
+ Each item is part of a group. Items correspond to files and groups
+ correspond to bulk imports, in the Accumulo GC.
+ * **Item reference** : A reference to an item that should prevent it from
+ being deleted. An item can have multiple item references.
+ * **Group reference** : A reference to a group that should prevent the
+ deletion of any items in a group. This corresponds to blip markers in the
+ Accumulo GC.
+ * **Deletion candidate** : An entry that signifies an item is a candidate for deletion.
+
+## Invariants
+
+Hopefully the test data never violates the following rules
+
+ * An Item should always be referenced by an Item reference, group reference or
+ a deletion candidate. There is one exception to this, items with a value of
+ `NEW`. Its ok for new items to be unreferenced.
+ * An Item reference should always have a corresponding item.
+
+## Executable components
+
+The test has the following executable components.
+
+ * **setup** : creates and configures table
+ * **generator** : continually generates items, references, and candidates.
+ These are generated randomly and spaced out over time, interleaving
+ unrelated entries. The generator should never create data that violates the
+ test invariants. Multiple generators can be run concurrently.
+ * **collector** : continually scans the data looking for unreferenced
+ candidates to delete. Should only run one at a time.
+ * **verifier** : This processes checks the table to ensure the test
+ invariants have not been violated. Before running this, the generator and
+ collector processes should be stopped.
+
+Running `./bin/gcs` will print help that shows how to run these processes.
+
+Below is simple script that runs a test scenario.
+
+```bash
+./bin/gcs setup
+
+for i in $(seq 1 10); do
+ ./bin/gcs generate &
+done
+
+./bin/gcs collect &
+
+sleep 12h
+
+pkill -f gcs
+./bin/gcs verify
+```
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Candidate.java b/src/main/java/org/apache/accumulo/testing/gcs/Candidate.java
new file mode 100644
index 0000000..068e78e
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Candidate.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import java.util.Comparator;
+
+import com.google.common.base.Objects;
+
+public class Candidate implements Comparable<Candidate> {
+ public final long clientId;
+ public final long groupId;
+ public final long itemId;
+
+ Candidate(long clientId, long groupId, long itemId) {
+ this.clientId = clientId;
+ this.groupId = groupId;
+ this.itemId = itemId;
+ }
+
+ public long getClientId() {
+ return clientId;
+ }
+
+ public long getGroupId() {
+ return groupId;
+ }
+
+ public long getItemId() {
+ return itemId;
+ }
+
+ private static Comparator<Candidate> COMPARATOR = Comparator.comparingLong(Candidate::getClientId)
+ .thenComparingLong(Candidate::getGroupId).thenComparingLong(Candidate::getItemId);
+
+ @Override
+ public int compareTo(Candidate o) {
+ return COMPARATOR.compare(this, o);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(clientId, groupId, itemId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Candidate) {
+ Item oi = (Item) o;
+ return clientId == oi.clientId && groupId == oi.groupId && itemId == oi.itemId;
+ }
+
+ return false;
+ }
+
+ public Item item() {
+ return new Item(clientId, groupId, itemId);
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Collector.java b/src/main/java/org/apache/accumulo/testing/gcs/Collector.java
new file mode 100644
index 0000000..628f286
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Collector.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.gcs;
+
+import java.util.Random;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+public class Collector {
+
+ Persistence persistence;
+ private int batchSize;
+
+ public Collector(GcsEnv gcsEnv) {
+ this.persistence = new Persistence(gcsEnv);
+ this.batchSize = gcsEnv.getBatchSize();
+ }
+
+ public static void main(String[] args) throws Exception {
+ new Collector(new GcsEnv(args)).run();
+ }
+
+ public static <T> void forEachBatch(Iterable<T> iter, int batchSize,
+ Consumer<TreeSet<T>> batchProcessor) {
+ TreeSet<T> batch = new TreeSet<>();
+
+ for (T element : iter) {
+ if (batch.size() >= batchSize) {
+ batchProcessor.accept(batch);
+ batch.clear();
+ }
+ batch.add(element);
+ }
+
+ if (!batch.isEmpty()) {
+ batchProcessor.accept(batch);
+ }
+ }
+
+ public void run() throws Exception {
+ Random rand = new Random();
+
+ while (true) {
+ forEachBatch(persistence.candidates(), batchSize, batch -> collect(batch));
+ Thread.sleep(13000);
+
+ if (rand.nextInt(10) == 0) {
+ persistence.flushTable();
+ }
+ }
+ }
+
+ private void collect(TreeSet<Candidate> candidates) {
+
+ int initialSize = candidates.size();
+
+ for (GroupRef gr : persistence.groupRefs()) {
+ candidates.subSet(new Candidate(gr.clientId, gr.groupId, 0),
+ new Candidate(gr.clientId, gr.groupId + 1, 0)).clear();
+ }
+
+ for (ItemRef ir : persistence.itemRefs()) {
+ candidates.remove(new Candidate(ir.clientId, ir.groupId, ir.itemId));
+ }
+
+ System.out.println("Deleting " + candidates.size() + " of " + initialSize);
+
+ for (Candidate c : candidates) {
+ persistence.delete(c.item());
+ }
+ persistence.flush();
+
+ for (Candidate c : candidates) {
+ persistence.delete(c);
+ }
+ persistence.flush();
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/GcsEnv.java b/src/main/java/org/apache/accumulo/testing/gcs/GcsEnv.java
new file mode 100644
index 0000000..b39727b
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/GcsEnv.java
@@ -0,0 +1,34 @@
+package org.apache.accumulo.testing.gcs;
+
+import org.apache.accumulo.testing.TestEnv;
+
+public class GcsEnv extends TestEnv {
+
+ public GcsEnv(String[] args) {
+ super(args);
+ }
+
+ public String getTableName() {
+ return testProps.getProperty("test.gcs.table", "gcs");
+ }
+
+ public int getMaxBuckets() {
+ return Integer.parseInt(testProps.getProperty("test.gcs.maxBuckets", "100000"));
+ }
+
+ public int getInitialTablets() {
+ return Integer.parseInt(testProps.getProperty("test.gcs.tablets", "10"));
+ }
+
+ public int getMaxWork() {
+ return Integer.parseInt(testProps.getProperty("test.gcs.maxWork", "100000000"));
+ }
+
+ public int getMaxActiveWork() {
+ return Integer.parseInt(testProps.getProperty("test.gcs.maxActiveWork", "10000"));
+ }
+
+ public int getBatchSize() {
+ return Integer.parseInt(testProps.getProperty("test.gcs.batchSize", "100000"));
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Generator.java b/src/main/java/org/apache/accumulo/testing/gcs/Generator.java
new file mode 100644
index 0000000..e5c6a12
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Generator.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+
+public class Generator {
+
+ private long clientId;
+ private long nextGroupId;
+ private long nextItemId;
+
+ private final int maxBuckets;
+
+ // The total number of work chains to execute
+ private final int maxWork;
+
+ // The max number of work chains that should be active at any one time.
+ private final int maxActiveWork;
+
+ Random rand = new Random();
+
+ private Persistence persistence;
+
+ public Generator(GcsEnv gcsEnv) {
+ this.persistence = new Persistence(gcsEnv);
+ this.maxBuckets = gcsEnv.getMaxBuckets();
+
+ this.maxWork = gcsEnv.getMaxWork();
+ this.maxActiveWork = gcsEnv.getMaxActiveWork();
+ }
+
+ private void run() {
+ // This holds future work to do.
+ List<Queue<Mutator>> allActions = new ArrayList<>();
+
+ int workCreated = 0;
+
+ clientId = Math.abs(rand.nextLong());
+
+ while (workCreated == 0 || !allActions.isEmpty()) {
+ while (workCreated < maxWork && allActions.size() < maxActiveWork) {
+ allActions.add(createWork());
+ workCreated++;
+ }
+
+ int index = rand.nextInt(allActions.size());
+
+ Queue<Mutator> queue = allActions.get(index);
+
+ int numToRun = Math.max(1, rand.nextInt(queue.size()));
+
+ // By selecting a random queue of work do to do and taking a random number of steps off the
+ // queue we are randomly interleaving unrelated work over time.
+ for (int i = 0; i < numToRun; i++) {
+ queue.remove().run(persistence);
+ }
+
+ if (queue.isEmpty()) {
+ allActions.set(index, allActions.get(allActions.size() - 1));
+ allActions.remove(allActions.size() - 1);
+ }
+ }
+ }
+
+ private Queue<Mutator> createWork() {
+
+ switch (rand.nextInt(2)) {
+ case 0:
+ return createGroupWork();
+ case 1:
+ return createSimpleWork();
+ default:
+ throw new IllegalStateException();
+ }
+
+ }
+
+ private Queue<Mutator> createSimpleWork() {
+ long groupId = nextGroupId++;
+ int bucket = rand.nextInt(maxBuckets);
+
+ ArrayDeque<Mutator> work = new ArrayDeque<>();
+
+ List<Item> newItems = new ArrayList<>();
+ int numItems = rand.nextInt(10) + 1;
+ for (int i = 0; i < numItems; i++) {
+ newItems.add(new Item(clientId, groupId, nextItemId++));
+ }
+
+ // copy because lambda will execute later
+ List<Item> itemsToAdd = new ArrayList<>(newItems);
+ work.add(p -> {
+ for (Item item : itemsToAdd) {
+ p.save(item, ItemState.NEW);
+ }
+ p.flush();
+ });
+
+ List<ItemRef> referencedItems = new ArrayList<>();
+
+ while (!newItems.isEmpty() || !referencedItems.isEmpty()) {
+ if (newItems.isEmpty()) {
+ int size = referencedItems.size();
+ List<ItemRef> subList = referencedItems.subList(rand.nextInt(size), size);
+ List<ItemRef> refsToDelete = new ArrayList<>(subList);
+ subList.clear();
+
+ work.add(p -> {
+ for (ItemRef ir : refsToDelete) {
+ p.save(new Candidate(ir.clientId, ir.groupId, ir.itemId));
+ }
+ p.flush();
+ });
+
+ work.add(p -> {
+ p.delete(refsToDelete);
+ p.flush();
+ });
+ } else if (referencedItems.isEmpty()) {
+ int size = newItems.size();
+ List<Item> subList = newItems.subList(rand.nextInt(size), size);
+ List<Item> itemsToRef = new ArrayList<>(subList);
+ subList.clear();
+
+ List<ItemRef> refsToAdd = new ArrayList<>();
+ itemsToRef.forEach(item -> refsToAdd.add(new ItemRef(bucket, item)));
+ referencedItems.addAll(refsToAdd);
+
+ work.add(p -> {
+ p.save(refsToAdd);
+ p.flush();
+ });
+
+ work.add(p -> {
+ for (Item item : itemsToRef) {
+ p.save(item, ItemState.REFERENCED);
+ }
+ p.flush();
+ });
+
+ } else {
+ int size = referencedItems.size();
+ List<ItemRef> subList = referencedItems.subList(rand.nextInt(size), size);
+ List<ItemRef> refsToDelete = new ArrayList<>(subList);
+ subList.clear();
+
+ Item itemToRef = newItems.remove(newItems.size() - 1);
+ referencedItems.add(new ItemRef(bucket, itemToRef));
+
+ work.add(p -> {
+ for (ItemRef ir : refsToDelete) {
+ p.save(new Candidate(ir.clientId, ir.groupId, ir.itemId));
+ }
+ p.flush();
+ });
+
+ work.add(p -> {
+ p.replace(refsToDelete, new ItemRef(bucket, itemToRef));
+ p.flush();
+ });
+
+ work.add(p -> {
+ p.save(itemToRef, ItemState.REFERENCED);
+ p.flush();
+ });
+ }
+ }
+
+ return work;
+ }
+
+ private Queue<Mutator> createGroupWork() {
+ long groupId = nextGroupId++;
+
+ long items[] = new long[rand.nextInt(10) + 1];
+ for (int i = 0; i < items.length; i++) {
+ items[i] = nextItemId++;
+ }
+
+ ArrayDeque<Mutator> work = new ArrayDeque<>();
+
+ work.add(p -> {
+ p.save(new GroupRef(clientId, groupId));
+ p.flush();
+ });
+
+ work.add(p -> {
+ for (long itemId : items) {
+ p.save(new Item(clientId, groupId, itemId), ItemState.REFERENCED);
+ }
+ p.flush();
+ });
+
+ List<ItemRef> refsToAdd = new ArrayList<>();
+ List<ItemRef> refsToDel = new ArrayList<>();
+
+ for (long itemId : items) {
+ for (int i = 0; i < rand.nextInt(3) + 1; i++) {
+ int bucket = rand.nextInt(maxBuckets);
+ refsToAdd.add(new ItemRef(bucket, clientId, groupId, itemId));
+ }
+ }
+
+ Collections.shuffle(refsToAdd, rand);
+
+ boolean deletedGroupRef = false;
+
+ while (!refsToAdd.isEmpty() || !refsToDel.isEmpty() || !deletedGroupRef) {
+ if (!refsToAdd.isEmpty() && rand.nextBoolean()) {
+ ItemRef ref = refsToAdd.remove(refsToAdd.size() - 1);
+ refsToDel.add(ref);
+ work.add(p -> {
+ p.save(ref);
+ p.flush();
+ });
+ }
+
+ if (refsToAdd.isEmpty() && !deletedGroupRef && rand.nextBoolean()) {
+ work.add(p -> {
+ p.delete(new GroupRef(clientId, groupId));
+ p.flush();
+ });
+ deletedGroupRef = true;
+ }
+
+ if (!refsToDel.isEmpty() && rand.nextBoolean()) {
+ ItemRef ref = refsToDel.remove(refsToDel.size() - 1);
+ work.add(p -> {
+ p.save(new Candidate(clientId, groupId, ref.itemId));
+ p.delete(ref);
+ p.flush();
+ });
+ }
+ }
+
+ return work;
+ }
+
+ public static void main(String[] args) {
+ new Generator(new GcsEnv(args)).run();
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/GroupRef.java b/src/main/java/org/apache/accumulo/testing/gcs/GroupRef.java
new file mode 100644
index 0000000..46c5b14
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/GroupRef.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+public class GroupRef {
+ public final long clientId;
+ public final long groupId;
+
+ GroupRef(long clientId, long groupId) {
+ this.clientId = clientId;
+ this.groupId = groupId;
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Item.java b/src/main/java/org/apache/accumulo/testing/gcs/Item.java
new file mode 100644
index 0000000..14e79d5
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Item.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import static org.apache.accumulo.testing.gcs.Persistence.toHex;
+
+import java.util.Comparator;
+
+import com.google.common.base.Objects;
+
+public class Item implements Comparable<Item> {
+
+ public final long clientId;
+ public final long groupId;
+ public final long itemId;
+
+ Item(long clientId, long groupId, long itemId) {
+ this.clientId = clientId;
+ this.groupId = groupId;
+ this.itemId = itemId;
+ }
+
+ Item(Candidate c) {
+ this(c.clientId, c.groupId, c.itemId);
+ }
+
+ public long getClientId() {
+ return clientId;
+ }
+
+ public long getGroupId() {
+ return groupId;
+ }
+
+ public long getItemId() {
+ return itemId;
+ }
+
+ private static Comparator<Item> COMPARATOR = Comparator.comparingLong(Item::getClientId)
+ .thenComparingLong(Item::getGroupId).thenComparingLong(Item::getItemId);
+
+ @Override
+ public int compareTo(Item o) {
+ return COMPARATOR.compare(this, o);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(clientId, groupId, itemId);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Item) {
+ Item oi = (Item) o;
+ return clientId == oi.clientId && groupId == oi.groupId && itemId == oi.itemId;
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "clientId:" + toHex(clientId) + " groupId:" + toHex(groupId) + " itemId:"
+ + toHex(itemId);
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/ItemRef.java b/src/main/java/org/apache/accumulo/testing/gcs/ItemRef.java
new file mode 100644
index 0000000..9a324af
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/ItemRef.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+public class ItemRef implements Comparable<ItemRef> {
+ public final int bucket;
+ public final long clientId;
+ public final long groupId;
+ public final long itemId;
+
+ ItemRef(int bucket, long clientId, long groupId, long itemId) {
+ this.bucket = bucket;
+ this.clientId = clientId;
+ this.groupId = groupId;
+ this.itemId = itemId;
+ }
+
+ ItemRef(int bucket, Item item) {
+ this(bucket, item.clientId, item.groupId, item.itemId);
+ }
+
+ public Item item() {
+ return new Item(clientId, groupId, itemId);
+ }
+
+ @Override
+ public int compareTo(ItemRef o) {
+ int cmp = Long.compare(bucket, o.bucket);
+ if (cmp == 0) {
+ cmp = item().compareTo(o.item());
+ }
+
+ return cmp;
+ }
+
+ @Override
+ public String toString() {
+ return "bucket:" + Persistence.toHex(bucket) + " " + item();
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/ItemState.java b/src/main/java/org/apache/accumulo/testing/gcs/ItemState.java
new file mode 100644
index 0000000..c418556
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/ItemState.java
@@ -0,0 +1,5 @@
+package org.apache.accumulo.testing.gcs;
+
+public enum ItemState {
+ NEW, REFERENCED
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Mutator.java b/src/main/java/org/apache/accumulo/testing/gcs/Mutator.java
new file mode 100644
index 0000000..d4cb3e3
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Mutator.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+public interface Mutator {
+ public void run(Persistence p);
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Persistence.java b/src/main/java/org/apache/accumulo/testing/gcs/Persistence.java
new file mode 100644
index 0000000..7638c6f
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Persistence.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.accumulo.testing.gcs;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeSet;
+import java.util.function.Function;
+
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.admin.NewTableConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.hash.Hashing;
+
+public class Persistence {
+
+ private BatchWriter writer;
+ private String table;
+ private AccumuloClient client;
+
+ Persistence(GcsEnv env) {
+ this.client = env.getAccumuloClient();
+ this.table = env.getTableName();
+ try {
+ this.writer = client.createBatchWriter(table);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static String toHex(int i) {
+ return Strings.padStart(Integer.toHexString(i), 8, '0');
+ }
+
+ static String toHex(long l) {
+ return Strings.padStart(Long.toHexString(l), 16, '0');
+ }
+
+ private void write(Mutation m) {
+ try {
+ writer.addMutation(m);
+ } catch (MutationsRejectedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private String toHex(long l1, long l2, long l3) {
+ return toHex(l1) + ":" + toHex(l2) + ":" + toHex(l3);
+ }
+
+ private String toHexWithHash(long l1, long l2) {
+ int hc = Hashing.murmur3_32().newHasher().putLong(l1).putLong(l2).hash().asInt();
+ return toHex(hc) + ":" + toHex(l1) + ":" + toHex(l2);
+ }
+
+ private String toHexWithHash(long l1, long l2, long l3) {
+ int hc = Hashing.murmur3_32().newHasher().putLong(l1).putLong(l2).putLong(l3).hash().asInt();
+ return toHex(hc) + ":" + toHex(l1) + ":" + toHex(l2) + ":" + toHex(l3);
+ }
+
+ void save(Item item, ItemState state) {
+ Mutation m = new Mutation("I:" + toHexWithHash(item.clientId, item.groupId));
+ m.put("item", toHex(item.itemId), state.name());
+ write(m);
+ }
+
+ void save(ItemRef itemRef) {
+ Mutation m = new Mutation("R:" + toHex(itemRef.bucket));
+ m.put("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId), "");
+ write(m);
+ }
+
+ public void save(Collection<ItemRef> refsToAdd) {
+ Map<Integer,Mutation> mutations = new HashMap<>();
+
+ for (ItemRef itemRef : refsToAdd) {
+ Mutation m = mutations.computeIfAbsent(itemRef.bucket,
+ bucket -> new Mutation("R:" + toHex(bucket)));
+ m.put("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId), "");
+ }
+
+ mutations.values().forEach(m -> write(m));
+
+ }
+
+ public void replace(Collection<ItemRef> refsToDelete, ItemRef refToAdd) {
+ Mutation m = new Mutation("R:" + toHex(refToAdd.bucket));
+ m.put("ref", toHex(refToAdd.clientId, refToAdd.groupId, refToAdd.itemId), "");
+
+ for (ItemRef ir : refsToDelete) {
+ Preconditions.checkArgument(refToAdd.bucket == ir.bucket);
+ m.putDelete("ref", toHex(ir.clientId, ir.groupId, ir.itemId));
+ }
+
+ write(m);
+ }
+
+ void save(GroupRef groupRef) {
+ Mutation m = new Mutation("G:" + toHexWithHash(groupRef.clientId, groupRef.groupId));
+ m.put("", "", "");
+ write(m);
+ }
+
+ void save(Candidate c) {
+ Mutation m = new Mutation("C:" + toHexWithHash(c.clientId, c.groupId, c.itemId));
+ m.put("", "", "");
+ write(m);
+ }
+
+ public void flush() {
+ try {
+ writer.flush();
+ } catch (MutationsRejectedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void delete(ItemRef itemRef) {
+ Mutation m = new Mutation("R:" + toHex(itemRef.bucket));
+ m.putDelete("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId));
+ write(m);
+ }
+
+ public void delete(Collection<ItemRef> refsToDelete) {
+ Map<Integer,Mutation> mutations = new HashMap<>();
+
+ for (ItemRef itemRef : refsToDelete) {
+ Mutation m = mutations.computeIfAbsent(itemRef.bucket,
+ bucket -> new Mutation("R:" + toHex(bucket)));
+ m.putDelete("ref", toHex(itemRef.clientId, itemRef.groupId, itemRef.itemId));
+ }
+
+ mutations.values().forEach(m -> write(m));
+ }
+
+ public void delete(GroupRef groupRef) {
+ Mutation m = new Mutation("G:" + toHexWithHash(groupRef.clientId, groupRef.groupId));
+ m.putDelete("", "");
+ write(m);
+ }
+
+ public void delete(Item item) {
+ Mutation m = new Mutation("I:" + toHexWithHash(item.clientId, item.groupId));
+ m.putDelete("item", toHex(item.itemId));
+ write(m);
+ }
+
+ public void delete(Candidate c) {
+ Mutation m = new Mutation("C:" + toHexWithHash(c.clientId, c.groupId, c.itemId));
+ m.putDelete("", "");
+ write(m);
+ }
+
+ private Scanner createScanner(String prefix) {
+ Scanner scanner;
+ try {
+ scanner = new IsolatedScanner(client.createScanner(table));
+ } catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
+ throw new RuntimeException(e);
+ }
+ scanner.setRange(Range.prefix(prefix));
+
+ return scanner;
+ }
+
+ private <T> Iterable<T> transformRange(String prefix, Function<Key,T> func) {
+ return Iterables.transform(createScanner(prefix), entry -> func.apply(entry.getKey()));
+ }
+
+ Iterable<Candidate> candidates() {
+ return transformRange("C:", k -> {
+ String row = k.getRowData().toString();
+ String[] fields = row.substring(11).split(":");
+
+ Preconditions.checkState(fields.length == 3, "Bad candidate row %s", row);
+
+ long clientId = Long.parseLong(fields[0], 16);
+ long groupId = Long.parseLong(fields[1], 16);
+ long itemId = Long.parseLong(fields[2], 16);
+
+ return new Candidate(clientId, groupId, itemId);
+ });
+ }
+
+ Iterable<ItemRef> itemRefs() {
+ return transformRange("R:", k -> {
+ String row = k.getRowData().toString();
+ String qual = k.getColumnQualifierData().toString();
+
+ int bucket = Integer.parseInt(row.substring(2), 16);
+
+ String[] fields = qual.split(":");
+
+ Preconditions.checkState(fields.length == 3, "Bad item ref %s", k);
+
+ long clientId = Long.parseLong(fields[0], 16);
+ long groupId = Long.parseLong(fields[1], 16);
+ long itemId = Long.parseLong(fields[2], 16);
+
+ return new ItemRef(bucket, clientId, groupId, itemId);
+ });
+ }
+
+ Iterable<GroupRef> groupRefs() {
+ return transformRange("G:", k -> {
+ String row = k.getRowData().toString();
+ String[] fields = row.substring(11).split(":");
+
+ Preconditions.checkState(fields.length == 2, "Bad group ref row %s", row);
+
+ long clientId = Long.parseLong(fields[0], 16);
+ long groupId = Long.parseLong(fields[1], 16);
+
+ return new GroupRef(clientId, groupId);
+ });
+ }
+
+ Iterable<Item> items(ItemState... states) {
+ EnumSet<ItemState> stateSet = EnumSet.of(states[0]);
+ for (int i = 1; i < states.length; i++) {
+ stateSet.add(states[i]);
+ }
+
+ Iterable<Entry<Key,Value>> itemIter = Iterables.filter(createScanner("I:"),
+ entry -> stateSet.contains(ItemState.valueOf(entry.getValue().toString())));
+
+ return Iterables.transform(itemIter, entry -> {
+ Key k = entry.getKey();
+ String row = k.getRowData().toString();
+ String qual = k.getColumnQualifierData().toString();
+
+ String[] fields = row.substring(11).split(":");
+
+ Preconditions.checkState(fields.length == 2, "Bad group ref row %s", row);
+
+ long clientId = Long.parseLong(fields[0], 16);
+ long groupId = Long.parseLong(fields[1], 16);
+ long itemId = Long.parseLong(qual, 16);
+
+ return new Item(clientId, groupId, itemId);
+ });
+ }
+
+ private static TreeSet<Text> initialSplits(GcsEnv env) {
+ TreeSet<Text> splits = new TreeSet<>();
+
+ int tabletsPerSection = env.getInitialTablets();
+
+ for (String prefix : new String[] {"G:", "C:", "I:", "R:"}) {
+ int numSplits = tabletsPerSection - 1;
+
+ long max;
+ if (prefix.equals("R:")) {
+ max = env.getMaxBuckets();
+ } else {
+ max = 1L << 33 - 1;
+ }
+
+ long distance = (max / tabletsPerSection) + 1;
+ long split = distance;
+
+ for (int i = 0; i < numSplits; i++) {
+ String s = String.format("%08x", split);
+ while (s.charAt(s.length() - 1) == '0') {
+ s = s.substring(0, s.length() - 1);
+ }
+ splits.add(new Text(prefix + s));
+ split += distance;
+ }
+
+ splits.add(new Text(prefix + "~"));
+ }
+
+ return splits;
+ }
+
+ public static void init(GcsEnv env) throws Exception {
+ NewTableConfiguration ntc = new NewTableConfiguration();
+ ntc.withSplits(initialSplits(env));
+ ntc.setProperties(ImmutableMap.of("table.compaction.major.ratio", "1"));
+ env.getAccumuloClient().tableOperations().create(env.getTableName(), ntc);
+ }
+
+ public void flushTable() {
+ try {
+ client.tableOperations().flush(table);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Setup.java b/src/main/java/org/apache/accumulo/testing/gcs/Setup.java
new file mode 100644
index 0000000..31c9647
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Setup.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.testing.gcs;
+
+public class Setup {
+ public static void main(String[] args) throws Exception {
+ Persistence.init(new GcsEnv(args));
+ }
+
+}
diff --git a/src/main/java/org/apache/accumulo/testing/gcs/Verifier.java b/src/main/java/org/apache/accumulo/testing/gcs/Verifier.java
new file mode 100644
index 0000000..285fa2c
--- /dev/null
+++ b/src/main/java/org/apache/accumulo/testing/gcs/Verifier.java
@@ -0,0 +1,69 @@
+package org.apache.accumulo.testing.gcs;
+
+import static org.apache.accumulo.testing.gcs.Collector.forEachBatch;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+public class Verifier {
+
+ Persistence persistence;
+ private int batchSize;
+
+ public Verifier(GcsEnv gcsEnv) {
+ this.persistence = new Persistence(gcsEnv);
+ this.batchSize = gcsEnv.getBatchSize();
+ }
+
+ public static void main(String[] args) {
+ new Verifier(new GcsEnv(args)).run();
+ }
+
+ private void run() {
+ forEachBatch(persistence.items(ItemState.REFERENCED), batchSize, batch -> checkItems(batch));
+ forEachBatch(persistence.itemRefs(), batchSize, batch -> checkItemRefs(batch));
+ }
+
+ // Ensure there is an item for each item ref
+ private void checkItemRefs(TreeSet<ItemRef> itemRefs) {
+ Map<Item,List<ItemRef>> refMap = new HashMap<>();
+
+ itemRefs.forEach(ir -> {
+ refMap.computeIfAbsent(ir.item(), i -> new ArrayList<>()).add(ir);
+ });
+
+ persistence.items(ItemState.NEW, ItemState.REFERENCED).forEach(item -> refMap.remove(item));
+
+ if (refMap.size() > 0) {
+ System.err.println("References without items : ");
+ refMap.values().stream().flatMap(List::stream).forEach(ir -> System.err.println("\t" + ir));
+ }
+
+ System.out.printf("Checked %,d item refs\n", itemRefs.size());
+ }
+
+ // Ensure all items are referenced by something.
+ private void checkItems(TreeSet<Item> items) {
+
+ int initialSize = items.size();
+
+ persistence.itemRefs().forEach(ir -> items.remove(ir.item()));
+
+ persistence.candidates().forEach(c -> items.remove(new Item(c)));
+
+ persistence.groupRefs()
+ .forEach(gr -> items
+ .subSet(new Item(gr.clientId, gr.groupId, 0), new Item(gr.clientId, gr.groupId + 1, 0))
+ .clear());
+
+ if (items.size() > 0) {
+ System.err.println("Unreferenced items : ");
+ items.forEach(i -> System.err.println("\t" + i));
+ }
+
+ System.out.printf("Checked %,d items\n", initialSize);
+ }
+}