You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/01/06 20:29:16 UTC
[accumulo] branch main updated: fixes #1320 adds a stress test for
ZooReaderWriter.mutateOrCreate() (#1851)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new e51a611 fixes #1320 adds a stress test for ZooReaderWriter.mutateOrCreate() (#1851)
e51a611 is described below
commit e51a611f6cc0faebd7261bb2eb197ee86f71e429
Author: Keith Turner <kt...@apache.org>
AuthorDate: Wed Jan 6 15:29:07 2021 -0500
fixes #1320 adds a stress test for ZooReaderWriter.mutateOrCreate() (#1851)
---
.../accumulo/test/functional/ZooMutatorIT.java | 158 +++++++++++++++++++++
1 file changed, 158 insertions(+)
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZooMutatorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ZooMutatorIT.java
new file mode 100644
index 0000000..114dc27
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZooMutatorIT.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.functional;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.hash.Hashing;
+
+@Category(MiniClusterOnlyTests.class)
+public class ZooMutatorIT extends AccumuloClusterHarness {
+ /**
+ * This test uses multiple threads to update the data in a single zookeeper node using
+ * {@link ZooReaderWriter#mutateOrCreate(String, byte[], org.apache.accumulo.fate.zookeeper.ZooReaderWriter.Mutator)}
+ * and tries to detect errors and race conditions in that code. Each thread uses
+ * {@link #nextValue(String)} to compute a new value for the ZK node based on the current value,
+ * producing a new unique value. Its expected that multiple threads calling
+ * {@link #nextValue(String)} as previously described should yield the same final value as a
+ * single thread repeatedly calling {@link #nextValue(String)} the same number of times. There are
+ * many things that can go wrong in the multithreaded case. This test tries to ensure the
+ * following are true for the multithreaded case.
+ *
+ * <ul>
+ * <li>All expected updates are made, none were skipped.
+ * <li>No updates are made twice. For example if two threads wrote the exact same value to the
+ * node this should be detected by the test. Would expect each update to be unique.
+ * <li>The updates are made in the same order as a single thread repeatedly calling
+ * {@link #nextValue(String)}.
+ * </ul>
+ *
+ * <p>
+ * If any of the expectations above are not met it should cause the hash, count, and/or count
+ * tracking done in the test to not match the what is computed by the single threaded code at the
+ * end of the test.
+ *
+ * <p>
+ * A hash and a counter are stored in ZK. The hashes form a chain of hashes as each new value is
+ * written because its a hash of the previous value. The chain of hashes is useful for detecting
+ * missing and out of order updates, but not duplicates. The counter and associated map that
+ * tracks which counts were seen is useful for detecting missing and duplicate updates. The
+ * counter is also used to weakly check for out of order updates, but the chain of hashes provides
+ * a much strong check for this.
+ */
+ @Test
+ public void concurrentMutatorTest() throws Exception {
+ try (var client = Accumulo.newClient().from(getClientProps()).build();
+ var context = (ClientContext) client) {
+ String secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
+
+ ZooReaderWriter zk = new ZooReaderWriter(context.getZooKeepers(),
+ context.getZooKeepersSessionTimeOut(), secret);
+
+ var executor = Executors.newFixedThreadPool(16);
+
+ String initialData = hash("Accumulo Zookeeper Mutator test data") + " 0";
+
+ List<Future<?>> futures = new ArrayList<>();
+
+ // This map is used to ensure multiple threads do not successfully write the same value and no
+ // values are skipped. The hash in the value also verifies similar things in a different way.
+ ConcurrentHashMap<Integer,Integer> countCounts = new ConcurrentHashMap<>();
+
+ for (int i = 0; i < 16; i++) {
+ futures.add(executor.submit(() -> {
+ try {
+
+ int count = -1;
+ while (count < 200) {
+ byte[] val =
+ zk.mutateOrCreate("/test-zm", initialData.getBytes(UTF_8), this::nextValue);
+ int nextCount = getCount(val);
+ assertTrue("nextCount <= count " + nextCount + " " + count, nextCount > count);
+ count = nextCount;
+ countCounts.merge(count, 1, Integer::sum);
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }));
+ }
+
+ // wait and check for errors in background threads
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ executor.shutdown();
+
+ byte[] actual = zk.getData("/test-zm");
+ int settledCount = getCount(actual);
+
+ assertTrue(settledCount >= 200);
+
+ String expected = initialData;
+
+ assertEquals(1, (int) countCounts.get(0));
+
+ for (int i = 1; i <= settledCount; i++) {
+ assertEquals(1, (int) countCounts.get(i));
+ expected = nextValue(expected);
+ }
+
+ assertEquals(settledCount + 1, countCounts.size());
+ assertEquals(expected, new String(actual, UTF_8));
+ }
+ }
+
+ private String hash(String data) {
+ return Hashing.sha256().hashString(data, UTF_8).toString();
+ }
+
+ private String nextValue(String currString) {
+ String[] tokens = currString.split(" ");
+ String currHash = tokens[0];
+ int count = Integer.parseInt(tokens[1]);
+ return (hash(currHash) + " " + (count + 1));
+ }
+
+ private byte[] nextValue(byte[] curr) {
+ return nextValue(new String(curr, UTF_8)).getBytes(UTF_8);
+ }
+
+ private int getCount(byte[] val) {
+ return Integer.parseInt(new String(val, UTF_8).split(" ")[1]);
+ }
+}