You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ed...@apache.org on 2022/09/15 11:46:08 UTC
[accumulo] branch main updated: Add version check to ZooPropStore (#2778)
This is an automated email from the ASF dual-hosted git repository.
edcoleman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new fac2d4dd4d Add version check to ZooPropStore (#2778)
fac2d4dd4d is described below
commit fac2d4dd4d87803204836d936ac97e99ba5bde4b
Author: EdColeman <de...@etcoleman.com>
AuthorDate: Thu Sep 15 11:46:02 2022 +0000
Add version check to ZooPropStore (#2778)
* Add version check to ZooPropStore
* remove refreshAfterWrite usage in cache
* merge update - add delay for export import check config
---
.../server/conf/ServerConfigurationFactory.java | 125 +++++++++-
.../accumulo/server/conf/store/PropStore.java | 11 +
.../conf/store/impl/PropCacheCaffeineImpl.java | 14 +-
.../server/conf/store/impl/ZooPropStore.java | 33 ++-
.../conf/store/impl/PropCacheCaffeineImplTest.java | 23 --
.../server/conf/store/impl/ZooPropLoaderTest.java | 266 +--------------------
.../apache/accumulo/test/shell/ShellServerIT.java | 1 +
7 files changed, 179 insertions(+), 294 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 718cc480f9..f3ea0d8f0c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -18,8 +18,14 @@
*/
package org.apache.accumulo.server.conf;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -28,16 +34,22 @@ import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.store.NamespacePropKey;
import org.apache.accumulo.server.conf.store.PropChangeListener;
+import org.apache.accumulo.server.conf.store.PropStore;
import org.apache.accumulo.server.conf.store.PropStoreKey;
+import org.apache.accumulo.server.conf.store.SystemPropKey;
import org.apache.accumulo.server.conf.store.TablePropKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
/**
- * A factor for configurations used by a server process. Instance of this class are thread-safe.
+ * A factory for configurations used by a server process. Instance of this class are thread-safe.
*/
public class ServerConfigurationFactory extends ServerConfiguration {
private final static Logger log = LoggerFactory.getLogger(ServerConfigurationFactory.class);
@@ -51,9 +63,17 @@ public class ServerConfigurationFactory extends ServerConfiguration {
private final SiteConfiguration siteConfig;
private final DeleteWatcher deleteWatcher = new DeleteWatcher();
+ private static final int REFRESH_PERIOD_MINUTES = 15;
+
+ private final ConfigRefreshRunner refresher;
+
public ServerConfigurationFactory(ServerContext context, SiteConfiguration siteConfig) {
this.context = context;
this.siteConfig = siteConfig;
+
+ refresher = new ConfigRefreshRunner();
+ Runtime.getRuntime()
+ .addShutdownHook(Threads.createThread("config-refresh-shutdownHook", refresher::shutdown));
}
public ServerContext getServerContext() {
@@ -140,4 +160,107 @@ public class ServerConfigurationFactory extends ServerConfiguration {
// no-op. changes handled by prop store impl
}
}
+
+ private class ConfigRefreshRunner {
+ private static final long MIN_JITTER_DELAY = 1;
+ private static final long MAX_JITTER_DELAY = 23;
+ private final ScheduledFuture<?> refreshTaskFuture;
+
+ ConfigRefreshRunner() {
+
+ Runnable refreshTask = this::verifySnapshotVersions;
+
+ ScheduledThreadPoolExecutor executor = ThreadPools.getServerThreadPools()
+ .createScheduledExecutorService(1, "config-refresh", false);
+
+ // scheduleWithFixedDelay - used so only one task will run concurrently.
+ // staggering the initial delay prevents synchronization of Accumulo servers communicating
+ // with ZooKeeper for the sync process. (Value is 25% -> 100% of the refresh period.)
+ long randDelay = jitter(REFRESH_PERIOD_MINUTES / 4, REFRESH_PERIOD_MINUTES);
+ refreshTaskFuture =
+ executor.scheduleWithFixedDelay(refreshTask, randDelay, REFRESH_PERIOD_MINUTES, MINUTES);
+ }
+
+ /**
+ * Check that the stored version in ZooKeeper matches the version held in the local snapshot.
+ * When a mismatch is detected, a change event is sent to the prop store which will cause a
+ * re-load. If the Zookeeper node has been deleted, the local cache entries are removed.
+ * <p>
+ * This method is designed to be called as a scheduled task, so it does not propagate exceptions
+ * other than interrupted Exceptions so the scheduled tasks will continue to run.
+ */
+ private void verifySnapshotVersions() {
+
+ long refreshStart = System.nanoTime();
+ int keyCount = 0;
+ int keyChangedCount = 0;
+
+ PropStore propStore = context.getPropStore();
+ keyCount++;
+
+ // rely on store to propagate change event if different
+ propStore.validateDataVersion(SystemPropKey.of(context),
+ ((ZooBasedConfiguration) getSystemConfiguration()).getDataVersion());
+ // small yield - spread out ZooKeeper calls
+ jitterDelay();
+
+ for (Map.Entry<NamespaceId,NamespaceConfiguration> entry : namespaceConfigs.entrySet()) {
+ keyCount++;
+ PropStoreKey<?> propKey = NamespacePropKey.of(context, entry.getKey());
+ if (!propStore.validateDataVersion(propKey, entry.getValue().getDataVersion())) {
+ keyChangedCount++;
+ namespaceConfigs.remove(entry.getKey());
+ }
+ // small yield - spread out ZooKeeper calls between namespace config checks
+ jitterDelay();
+ }
+
+ for (Map.Entry<TableId,TableConfiguration> entry : tableConfigs.entrySet()) {
+ keyCount++;
+ TableId tid = entry.getKey();
+ PropStoreKey<?> propKey = TablePropKey.of(context, tid);
+ if (!propStore.validateDataVersion(propKey, entry.getValue().getDataVersion())) {
+ keyChangedCount++;
+ tableConfigs.remove(tid);
+ tableParentConfigs.remove(tid);
+ log.debug("data version sync: difference found. forcing configuration update for {}}",
+ propKey);
+ }
+ // small yield - spread out ZooKeeper calls between table config checks
+ jitterDelay();
+ }
+
+ log.debug("data version sync: Total runtime {} ms for {} entries, changes detected: {}",
+ NANOSECONDS.toMillis(System.nanoTime() - refreshStart), keyCount, keyChangedCount);
+ }
+
+ /**
+ * Generate a small random integer for jitter between [min,max).
+ */
+ @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
+ justification = "random number not used in secure context")
+ private long jitter(final long min, final long max) {
+ return ThreadLocalRandom.current().nextLong(min, max);
+ }
+
+ /**
+ * Sleep for a random jitter interval defined by MIN_JITTER_DELAY and MAX_JITTER_DELAY
+ * </p>
+ * Used to spread out operations so that Server config sync communications don't overwhelm
+ * ZooKeeper and are not synchronized across the cluster.
+ *
+ */
+ private void jitterDelay() {
+ try {
+ Thread.sleep(jitter(MIN_JITTER_DELAY, MAX_JITTER_DELAY));
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ public void shutdown() {
+ refreshTaskFuture.cancel(true);
+ }
+ }
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
index 0dbb18f2a6..b73a887390 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/PropStore.java
@@ -120,4 +120,15 @@ public interface PropStore {
@Nullable
VersionedProperties getWithoutCaching(PropStoreKey<?> propStoreKey);
+ /**
+ * Compare the stored data version with the expected version. Notifies subscribers of the change
+ * detection.
+ *
+ * @param storeKey
+ * specifies key for backend store
+ * @param expectedVersion
+ * the expected data version
+ * @return true if the stored version matches the provided expected version.
+ */
+ boolean validateDataVersion(PropStoreKey<?> storeKey, long expectedVersion);
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
index 3a952a13c0..0cf901511d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImpl.java
@@ -39,11 +39,11 @@ import com.github.benmanes.caffeine.cache.Ticker;
public class PropCacheCaffeineImpl implements PropCache {
public static final TimeUnit BASE_TIME_UNITS = TimeUnit.MINUTES;
- public static final int REFRESH_MIN = 15;
+
public static final int EXPIRE_MIN = 60;
private static final Logger log = LoggerFactory.getLogger(PropCacheCaffeineImpl.class);
private static final Executor executor = ThreadPools.getServerThreadPools().createThreadPool(1,
- 20, 60, TimeUnit.SECONDS, "cache-refresh", false);
+ 20, 60, TimeUnit.SECONDS, "caffeine-tasks", false);
private final PropStoreMetrics metrics;
@@ -52,15 +52,15 @@ public class PropCacheCaffeineImpl implements PropCache {
private PropCacheCaffeineImpl(final CacheLoader<PropStoreKey<?>,VersionedProperties> cacheLoader,
final PropStoreMetrics metrics, final Ticker ticker, boolean runTasksInline) {
this.metrics = metrics;
- var builder = Caffeine.newBuilder().refreshAfterWrite(REFRESH_MIN, BASE_TIME_UNITS)
- .expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS).evictionListener(this::evictionNotifier);
+ var builder = Caffeine.newBuilder().expireAfterAccess(EXPIRE_MIN, BASE_TIME_UNITS)
+ .evictionListener(this::evictionNotifier);
if (runTasksInline) {
- builder = builder.executor(Runnable::run);
+ builder.executor(Runnable::run);
} else {
- builder = builder.executor(executor);
+ builder.executor(executor);
}
if (ticker != null) {
- builder = builder.ticker(ticker);
+ builder.ticker(ticker);
}
cache = builder.build(cacheLoader);
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
index d360b6117b..ef62ad0a68 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/store/impl/ZooPropStore.java
@@ -48,6 +48,8 @@ import org.slf4j.LoggerFactory;
import com.github.benmanes.caffeine.cache.Ticker;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+
public class ZooPropStore implements PropStore, PropChangeListener {
private final static Logger log = LoggerFactory.getLogger(ZooPropStore.class);
@@ -87,6 +89,8 @@ public class ZooPropStore implements PropStore, PropChangeListener {
* @param ticker
* a synthetic clock used for testing. Optional, if null, one is created.
*/
+ @SuppressFBWarnings(value = "PREDICTABLE_RANDOM",
+ justification = "random number not used in secure context")
ZooPropStore(final InstanceId instanceId, final ZooReaderWriter zrw, final ReadyMonitor monitor,
final PropStoreWatcher watcher, final Ticker ticker) {
@@ -117,7 +121,11 @@ public class ZooPropStore implements PropStore, PropChangeListener {
throw new IllegalStateException("Instance may not have been initialized, root node: " + path
+ " does not exist in ZooKeeper");
}
- } catch (InterruptedException | KeeperException ex) {
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(
+ "Interrupted trying to read root node " + instanceId + " from ZooKeeper", ex);
+ } catch (KeeperException ex) {
throw new IllegalStateException("Failed to read root node " + instanceId + " from ZooKeeper",
ex);
}
@@ -391,4 +399,27 @@ public class ZooPropStore implements PropStore, PropChangeListener {
public @Nullable VersionedProperties getWithoutCaching(PropStoreKey<?> propStoreKey) {
return cache.getWithoutCaching(propStoreKey);
}
+
+ @Override
+ public boolean validateDataVersion(PropStoreKey<?> storeKey, long expectedVersion) {
+ try {
+ Stat stat = zrw.getStatus(storeKey.getPath());
+ log.trace("data version sync: stat returned: {} for {}", stat, storeKey);
+ if (stat == null || expectedVersion != stat.getVersion()) {
+ propStoreWatcher.signalZkChangeEvent(storeKey);
+ return false;
+ }
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(ex);
+ } catch (KeeperException.NoNodeException ex) {
+ propStoreWatcher.signalZkChangeEvent(storeKey);
+ return false;
+ } catch (KeeperException ex) {
+ log.debug("exception occurred verifying data version for {}", storeKey);
+ return false;
+ }
+ return true;
+ }
+
}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
index fda445bac2..409b10719c 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/PropCacheCaffeineImplTest.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.server.conf.store.impl;
import static org.apache.accumulo.core.conf.Property.TABLE_BULK_MAX_TABLETS;
import static org.apache.accumulo.core.conf.Property.TABLE_FILE_BLOCK_SIZE;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -31,7 +30,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -155,27 +153,6 @@ public class PropCacheCaffeineImplTest {
assertNull(cache.getWithoutCaching(table2PropKey));
}
- VersionedProperties asyncProps() {
- return vProps;
- }
-
- @Test
- public void refreshTest() throws Exception {
-
- expect(zooPropLoader.load(eq(tablePropKey))).andReturn(vProps).once();
-
- var future = CompletableFuture.supplyAsync(this::asyncProps);
-
- expect(zooPropLoader.asyncReload(eq(tablePropKey), eq(vProps), anyObject())).andReturn(future)
- .once();
-
- replay(context, propStoreWatcher, zooPropLoader);
- assertNotNull(cache.get(tablePropKey)); // will call load and place into cache
-
- ticker.advance(30, TimeUnit.MINUTES);
- assertNotNull(cache.get(tablePropKey)); // will async check stat and then reload
- }
-
@Test
public void expireTest() {
expect(zooPropLoader.load(eq(tablePropKey))).andReturn(vProps).times(2);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
index 7fafa26fe6..9dd18d7a3c 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/store/impl/ZooPropLoaderTest.java
@@ -23,7 +23,6 @@ import static org.apache.accumulo.core.conf.Property.MANAGER_CLIENTPORT;
import static org.apache.accumulo.core.conf.Property.TSERV_CLIENTPORT;
import static org.apache.accumulo.core.conf.Property.TSERV_NATIVEMAP_ENABLED;
import static org.apache.accumulo.core.conf.Property.TSERV_SCAN_MAX_OPENFILES;
-import static org.apache.accumulo.server.conf.store.impl.PropCacheCaffeineImpl.REFRESH_MIN;
import static org.easymock.EasyMock.anyLong;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.capture;
@@ -34,7 +33,6 @@ import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
-import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -44,7 +42,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
@@ -234,10 +231,6 @@ public class ZooPropLoaderTest {
expect(zrw.getData(eq(propStoreKey.getPath()), anyObject(), anyObject()))
.andReturn(propCodec.toBytes(defaultProps)).once();
- Stat stat = new Stat();
- stat.setVersion(123); // set different version so reload triggered
- expect(zrw.getStatus(propStoreKey.getPath())).andReturn(stat).once();
-
expect(zrw.getData(eq(propStoreKey.getPath()), anyObject(), anyObject()))
.andThrow(new KeeperException.NoNodeException("forced no node")).anyTimes();
@@ -249,12 +242,10 @@ public class ZooPropLoaderTest {
cacheMetrics.addLoadTime(anyLong());
expectLastCall().times(1);
- cacheMetrics.incrRefresh();
- expectLastCall().times(1);
- cacheMetrics.incrRefreshLoad();
+ cacheMetrics.incrEviction();
expectLastCall().times(1);
cacheMetrics.incrZkError();
- expectLastCall().times(2);
+ expectLastCall().times(1);
replay(context, zrw, propStoreWatcher, cacheMetrics);
@@ -269,10 +260,8 @@ public class ZooPropLoaderTest {
// read cached value
assertNotNull(cache.get(propStoreKey));
- // advance so refresh called.
- ticker.advance(20, TimeUnit.MINUTES);
-
- assertNotNull(cache.get(propStoreKey));
+ // advance so expire called.
+ ticker.advance(120, TimeUnit.MINUTES);
assertNull(cache.get(propStoreKey));
}
@@ -362,253 +351,6 @@ public class ZooPropLoaderTest {
assertNull(cache.getWithoutCaching(propStoreKey));
}
- @Test
- public void refreshTest() throws Exception {
-
- VersionedProperties defaultProps = new VersionedProperties();
-
- // first call loads cache
- Capture<Stat> stat = newCapture();
- expect(zrw.getData(eq(propStoreKey.getPath()), anyObject(), capture(stat))).andAnswer(() -> {
- Stat s = stat.getValue();
- s.setCtime(System.currentTimeMillis());
- s.setMtime(System.currentTimeMillis());
- s.setCzxid(1234);
- s.setVersion(0);
- stat.setValue(s);
- return propCodec.toBytes(defaultProps);
- }).times(1);
-
- Stat expectedStat = new Stat();
- expectedStat.setVersion(0);
- expect(zrw.getStatus(propStoreKey.getPath())).andReturn(expectedStat).times(2);
-
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(1);
- cacheMetrics.incrRefresh();
- expectLastCall().times(2);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
-
- PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build();
-
- // load cache
- log.debug("received: {}", cache.get(propStoreKey));
-
- ticker.advance(REFRESH_MIN + 1, TimeUnit.MINUTES);
-
- assertNotNull(cache.get(propStoreKey));
-
- ticker.advance(REFRESH_MIN / 2, TimeUnit.MINUTES);
-
- assertNotNull(cache.get(propStoreKey));
-
- ticker.advance(REFRESH_MIN + 1, TimeUnit.MINUTES);
-
- assertNotNull(cache.get(propStoreKey));
-
- ticker.advance(1, TimeUnit.MINUTES);
-
- assertNotNull(cache.get(propStoreKey));
-
- }
-
- /**
- * Test that when the refreshAfterWrite period expires that the data version is checked against
- * stored value - and on mismatch, rereads the values from ZooKeeper.
- */
- @Test
- public void refreshDifferentVersionTest() throws Exception {
-
- final int initialVersion = 123;
- Capture<PropStoreWatcher> propStoreWatcherCapture = newCapture();
-
- Capture<Stat> stat = newCapture();
-
- expect(zrw.getData(eq(propStoreKey.getPath()), capture(propStoreWatcherCapture), capture(stat)))
- .andAnswer(() -> {
- Stat s = stat.getValue();
- s.setCtime(System.currentTimeMillis());
- s.setMtime(System.currentTimeMillis());
- s.setVersion(initialVersion + 1);
- stat.setValue(s);
- return propCodec.toBytes(new VersionedProperties(initialVersion + 1, Instant.now(),
- Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), "7G")));
- }).once();
-
- // make it look like version on ZK has advanced.
- Stat stat2 = new Stat();
- stat2.setVersion(initialVersion + 3); // initSysProps 123, on write 124
- expect(zrw.getStatus(propStoreKey.getPath())).andReturn(stat2).once();
-
- Capture<Stat> stat3 = newCapture();
-
- expect(
- zrw.getData(eq(propStoreKey.getPath()), capture(propStoreWatcherCapture), capture(stat3)))
- .andAnswer(() -> {
- Stat s = stat3.getValue();
- s.setCtime(System.currentTimeMillis());
- s.setMtime(System.currentTimeMillis());
- s.setVersion(initialVersion + 4);
- stat3.setValue(s);
- return propCodec.toBytes(new VersionedProperties(initialVersion + 3, Instant.now(),
- Map.of(Property.TABLE_SPLIT_THRESHOLD.getKey(), "12G")));
- }).once();
-
- propStoreWatcher.signalCacheChangeEvent(eq(propStoreKey));
- expectLastCall();
-
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(2);
-
- cacheMetrics.incrRefresh();
- expectLastCall().times(1);
-
- cacheMetrics.incrRefreshLoad();
- expectLastCall().times(1);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
-
- PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build();
-
- // prime cache
- var origProps = cache.get(propStoreKey);
- assertNotNull(origProps);
- assertEquals("7G", origProps.asMap().get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
-
- ticker.advance(REFRESH_MIN + 1, TimeUnit.MINUTES);
- // first call after refresh return original and schedules update
- var originalProps = cache.get(propStoreKey);
- assertNotNull(originalProps);
- assertNotNull(originalProps.asMap().get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
-
- // refresh should have loaded updated value;
- var updatedProps = cache.get(propStoreKey);
- log.debug("Updated props: {}", updatedProps == null ? "null" : updatedProps.print(true));
-
- assertNotNull(updatedProps);
-
- assertEquals("12G", updatedProps.asMap().get(Property.TABLE_SPLIT_THRESHOLD.getKey()));
- }
-
- /**
- * Test that when the refreshAfterWrite period expires that the data version is checked against
- * stored value - and on match, returns the current value without rereading the values from
- * ZooKeeper.
- *
- * @throws Exception
- * any exception is a test failure
- */
- @Test
- public void refreshSameVersionTest() throws Exception {
-
- final int expectedVersion = 123;
-
- VersionedProperties mockProps = createMock(VersionedProperties.class);
- expect(mockProps.getTimestamp()).andReturn(Instant.now()).once();
- expect(mockProps.asMap()).andReturn(Map.of());
-
- Capture<Stat> stat = newCapture();
-
- // first call loads cache
- expect(zrw.getData(eq(propStoreKey.getPath()), anyObject(), capture(stat))).andAnswer(() -> {
- Stat s = stat.getValue();
- s.setCtime(System.currentTimeMillis());
- s.setMtime(System.currentTimeMillis());
- s.setVersion(expectedVersion);
- stat.setValue(s);
- return propCodec.toBytes(mockProps);
- }).times(1);
-
- Stat stat2 = new Stat();
- stat2.setCtime(System.currentTimeMillis());
- stat2.setMtime(System.currentTimeMillis());
- stat2.setVersion(expectedVersion);
-
- expect(zrw.getStatus(propStoreKey.getPath())).andReturn(stat2).once();
-
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(1);
- cacheMetrics.incrRefresh();
- expectLastCall().times(1);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics, mockProps);
-
- PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build();
-
- // prime cache
- cache.get(propStoreKey);
-
- ticker.advance(30, TimeUnit.MINUTES);
-
- VersionedProperties vPropsRead = cache.get(propStoreKey);
-
- assertNotNull(vPropsRead);
-
- cache.get(propStoreKey);
-
- verify(mockProps);
- }
-
- /**
- * reload exception - exception thrown reading Stat to check version.
- *
- * @throws Exception
- * any exception is a test failure.
- */
- @Test
- public void refreshExceptionTest() throws Exception {
- VersionedProperties defaultProps = new VersionedProperties();
-
- // first call loads cache
- Capture<Stat> stat = newCapture();
- expect(zrw.getData(eq(propStoreKey.getPath()), anyObject(), capture(stat))).andAnswer(() -> {
- Stat s = stat.getValue();
- s.setCtime(System.currentTimeMillis());
- s.setMtime(System.currentTimeMillis());
- s.setCzxid(1234);
- s.setVersion(0);
- stat.setValue(s);
- return propCodec.toBytes(defaultProps);
- }).times(1);
-
- Stat expectedStat = new Stat();
- expectedStat.setVersion(0);
- expect(zrw.getStatus(propStoreKey.getPath()))
- .andThrow(new KeeperException.NoNodeException("force no node exception")).once();
-
- propStoreWatcher.signalZkChangeEvent(eq(propStoreKey));
- expectLastCall().anyTimes();
-
- expect(zrw.getData(eq(propStoreKey.getPath()), anyObject(), anyObject()))
- .andThrow(new KeeperException.NoNodeException("force no node exception")).once();
-
- cacheMetrics.addLoadTime(anyLong());
- expectLastCall().times(1);
- cacheMetrics.incrRefresh();
- expectLastCall().times(1);
- cacheMetrics.incrZkError();
- expectLastCall().times(2);
-
- replay(context, zrw, propStoreWatcher, cacheMetrics);
-
- PropCacheCaffeineImpl cache =
- new PropCacheCaffeineImpl.Builder(loader, cacheMetrics).forTests(ticker).build();
-
- // load cache
- log.debug("received: {}", cache.get(propStoreKey));
-
- ticker.advance(REFRESH_MIN + 1, TimeUnit.MINUTES);
-
- assertNotNull(cache.get(propStoreKey)); // returns current and queues async refresh
-
- assertNull(cache.get(propStoreKey)); // on exception, the loader should return null
-
- }
-
@Test
public void captureExampleTest() throws Exception {
diff --git a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
index af18971d48..99f9f9d212 100644
--- a/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/shell/ShellServerIT.java
@@ -211,6 +211,7 @@ public class ShellServerIT extends SharedMiniClusterBase {
assertEquals(0, cp.run(distCpArgs), "Failed to run distcp: " + Arrays.toString(distCpArgs));
}
ts.exec("importtable " + table2 + " " + import_, true);
+ Thread.sleep(100);
ts.exec("config -t " + table2 + " -np", true, "345M", true);
ts.exec("getsplits -t " + table2, true, "row5", true);
ts.exec("constraint --list -t " + table2, true, "VisibilityConstraint=2", true);