You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/06/13 20:36:18 UTC
[accumulo] branch master updated: fix #1123 replace observing
config with as needed derivation (#1201)
This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 17e8f6a fix #1123 replace observing config with as needed derivation (#1201)
17e8f6a is described below
commit 17e8f6a0b489caf6208a4be3be66dce8726ef7d2
Author: Keith Turner <kt...@apache.org>
AuthorDate: Thu Jun 13 16:36:13 2019 -0400
fix #1123 replace observing config with as needed derivation (#1201)
---
.../accumulo/core/conf/AccumuloConfiguration.java | 86 ++++++++
.../accumulo/core/conf/ConfigurationObserver.java | 25 ---
.../core/conf/ObservableConfiguration.java | 116 ----------
.../core/conf/ObservableConfigurationTest.java | 99 ---------
.../accumulo/server/conf/NamespaceConfWatcher.java | 121 -----------
.../server/conf/NamespaceConfiguration.java | 31 +--
.../server/conf/ServerConfigurationFactory.java | 10 -
.../accumulo/server/conf/TableConfWatcher.java | 122 -----------
.../accumulo/server/conf/TableConfiguration.java | 142 ++++--------
.../balancer/HostRegexTableLoadBalancer.java | 241 ++++++++++++---------
.../server/conf/NamespaceConfigurationTest.java | 19 +-
.../conf/ServerConfigurationFactoryTest.java | 4 +-
.../server/conf/TableConfigurationTest.java | 20 +-
.../BaseHostRegexTableLoadBalancerTest.java | 13 +-
...tRegexTableLoadBalancerReconfigurationTest.java | 8 +-
.../balancer/HostRegexTableLoadBalancerTest.java | 94 +-------
.../org/apache/accumulo/tserver/TabletServer.java | 8 -
.../tserver/constraints/ConstraintChecker.java | 49 +----
.../org/apache/accumulo/tserver/tablet/Tablet.java | 132 ++---------
.../apache/accumulo/tserver/tablet/TabletTest.java | 19 +-
20 files changed, 331 insertions(+), 1028 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
index 450b2f2..4cd7a91 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/AccumuloConfiguration.java
@@ -28,8 +28,10 @@ import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.accumulo.core.conf.PropertyType.PortRange;
@@ -435,6 +437,90 @@ public abstract class AccumuloConfiguration implements Iterable<Entry<String,Str
return null;
}
+ private static class RefCount<T> {
+ T obj;
+ long count;
+
+ RefCount(long c, T r) {
+ this.count = c;
+ this.obj = r;
+ }
+ }
+
+ private class DeriverImpl<T> implements Deriver<T> {
+
+ private final AtomicReference<RefCount<T>> refref = new AtomicReference<>();
+ private final Function<AccumuloConfiguration,T> converter;
+
+ DeriverImpl(Function<AccumuloConfiguration,T> converter) {
+ this.converter = converter;
+ }
+
+ /**
+ * This method was written with the goal of avoiding thread contention and minimizing
+ * recomputation. Configuration can be accessed frequently by many threads. Ideally, threads
+ * working on unrelated task would not impeded each other because of accessing config.
+ *
+ * To avoid thread contention, synchronization and needless calls to compare and set were
+ * avoided. For example if 100 threads are all calling compare and set in a loop this could
+ * cause significant contention.
+ */
+ @Override
+ public T derive() {
+
+ // very important to obtain this before possibly recomputing object
+ long uc = getUpdateCount();
+
+ RefCount<T> rc = refref.get();
+
+ if (rc == null || rc.count != uc) {
+ T newObj = converter.apply(AccumuloConfiguration.this);
+
+ // very important to record the update count that was obtained before recomputing.
+ RefCount<T> nrc = new RefCount<>(uc, newObj);
+
+ /*
+ * The return value of compare and set is intentionally ignored here. This code could loop
+ * calling compare and set inorder to avoid returning a stale object. However after this
+ * function returns, the object could immediately become stale. So in the big picture stale
+ * objects can not be prevented. Looping here could cause thread contention, but it would
+ * not solve the overall stale object problem. That is why the return value was ignored. The
+ * following line is a least effort attempt to make the result of this recomputation
+ * available to the next caller.
+ */
+ refref.compareAndSet(rc, nrc);
+
+ return nrc.obj;
+ }
+
+ return rc.obj;
+ }
+ }
+
+ /**
+ * Automatically regenerates an object whenever configuration changes. When configuration is not
+ * changing, keeps returning the same object. Implementations should be thread safe and eventually
+ * consistent. See {@link AccumuloConfiguration#newDeriver(Function)}
+ */
+ public static interface Deriver<T> {
+ public T derive();
+ }
+
+ /**
+ * Enables deriving an object from configuration and automatically deriving a new object any time
+ * configuration changes.
+ *
+ * @param converter
+ * This functions is used to create an object from configuration. A reference to this
+ * function will be kept and called by the returned deriver.
+ * @return The returned supplier will automatically re-derive the object any time this
+ * configuration changes. When configuration is not changing, the same object is returned.
+ *
+ */
+ public <T> Deriver<T> newDeriver(Function<AccumuloConfiguration,T> converter) {
+ return new DeriverImpl<>(converter);
+ }
+
private static final String SCAN_EXEC_THREADS = "threads";
private static final String SCAN_EXEC_PRIORITY = "priority";
private static final String SCAN_EXEC_PRIORITIZER = "prioritizer";
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationObserver.java b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationObserver.java
deleted file mode 100644
index 8a62d26..0000000
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationObserver.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.conf;
-
-public interface ConfigurationObserver {
- void propertyChanged(String key);
-
- void propertiesChanged();
-
- void sessionExpired();
-}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java
deleted file mode 100644
index fb49230..0000000
--- a/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.conf;
-
-import static java.util.Objects.requireNonNull;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A configuration that can be observed. Handling of observers is thread-safe.
- */
-public abstract class ObservableConfiguration extends AccumuloConfiguration {
-
- private static final Logger log = LoggerFactory.getLogger(ObservableConfiguration.class);
-
- private Set<ConfigurationObserver> observers;
-
- /**
- * Creates a new observable configuration.
- */
- public ObservableConfiguration() {
- observers = Collections.synchronizedSet(new java.util.HashSet<>());
- }
-
- /**
- * Adds an observer.
- *
- * @param co
- * observer
- * @throws NullPointerException
- * if co is null
- */
- public void addObserver(ConfigurationObserver co) {
- requireNonNull(co);
- observers.add(co);
- }
-
- /**
- * Removes an observer.
- *
- * @param co
- * observer
- */
- public void removeObserver(ConfigurationObserver co) {
- observers.remove(co);
- }
-
- /**
- * Gets the current set of observers. The returned collection is a snapshot, and changes to it do
- * not reflect back to the configuration.
- *
- * @return observers
- */
- public Collection<ConfigurationObserver> getObservers() {
- return snapshot(observers);
- }
-
- private static Collection<ConfigurationObserver>
- snapshot(Collection<ConfigurationObserver> observers) {
- Collection<ConfigurationObserver> c = new java.util.ArrayList<>();
- synchronized (observers) {
- c.addAll(observers);
- }
- return c;
- }
-
- /**
- * Expires all observers.
- */
- public void expireAllObservers() {
- Collection<ConfigurationObserver> copy = snapshot(observers);
- log.info("Expiring {} observers", copy.size());
- for (ConfigurationObserver co : copy)
- co.sessionExpired();
- }
-
- /**
- * Notifies all observers that a property changed.
- *
- * @param key
- * configuration property key
- */
- public void propertyChanged(String key) {
- Collection<ConfigurationObserver> copy = snapshot(observers);
- for (ConfigurationObserver co : copy)
- co.propertyChanged(key);
- }
-
- /**
- * Notifies all observers that properties changed.
- */
- public void propertiesChanged() {
- Collection<ConfigurationObserver> copy = snapshot(observers);
- for (ConfigurationObserver co : copy)
- co.propertiesChanged();
- }
-}
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java
deleted file mode 100644
index e6c7169..0000000
--- a/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.conf;
-
-import static org.easymock.EasyMock.createMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.Predicate;
-
-import org.junit.Before;
-import org.junit.Test;
-
-public class ObservableConfigurationTest {
- private static class TestObservableConfig extends ObservableConfiguration {
- @Override
- public String get(Property property) {
- return null;
- }
-
- @Override
- public void getProperties(Map<String,String> props, Predicate<String> filter) {}
- }
-
- private ObservableConfiguration c;
- private ConfigurationObserver co1;
-
- @Before
- public void setUp() {
- c = new TestObservableConfig();
- co1 = createMock(ConfigurationObserver.class);
- }
-
- @Test
- public void testAddAndRemove() {
- ConfigurationObserver co2 = createMock(ConfigurationObserver.class);
- c.addObserver(co1);
- c.addObserver(co2);
- Collection<ConfigurationObserver> cos = c.getObservers();
- assertEquals(2, cos.size());
- assertTrue(cos.contains(co1));
- assertTrue(cos.contains(co2));
- c.removeObserver(co1);
- cos = c.getObservers();
- assertEquals(1, cos.size());
- assertTrue(cos.contains(co2));
- }
-
- @Test(expected = NullPointerException.class)
- public void testNoNullAdd() {
- c.addObserver(null);
- }
-
- @Test
- public void testSessionExpired() {
- c.addObserver(co1);
- co1.sessionExpired();
- replay(co1);
- c.expireAllObservers();
- verify(co1);
- }
-
- @Test
- public void testPropertyChanged() {
- String key = "key";
- c.addObserver(co1);
- co1.propertyChanged(key);
- replay(co1);
- c.propertyChanged(key);
- verify(co1);
- }
-
- @Test
- public void testPropertiesChanged() {
- c.addObserver(co1);
- co1.propertiesChanged();
- replay(co1);
- c.propertiesChanged();
- verify(co1);
- }
-}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
deleted file mode 100644
index baaee2a..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.conf;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.NamespaceId;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class NamespaceConfWatcher implements Watcher {
-
- private static final Logger log = LoggerFactory.getLogger(NamespaceConfWatcher.class);
- private final ServerContext context;
- private final String namespacesPrefix;
- private final int namespacesPrefixLength;
-
- NamespaceConfWatcher(ServerContext context) {
- this.context = context;
- namespacesPrefix = context.getZooKeeperRoot() + Constants.ZNAMESPACES + "/";
- namespacesPrefixLength = namespacesPrefix.length();
- }
-
- static String toString(WatchedEvent event) {
- return new StringBuilder("{path=").append(event.getPath()).append(",state=")
- .append(event.getState()).append(",type=").append(event.getType()).append("}").toString();
- }
-
- @Override
- public void process(WatchedEvent event) {
- String path = event.getPath();
- if (log.isTraceEnabled()) {
- log.trace("WatchedEvent : {}", toString(event));
- }
-
- String namespaceIdStr = null;
- String key = null;
-
- if (path != null) {
- if (path.startsWith(namespacesPrefix)) {
- namespaceIdStr = path.substring(namespacesPrefixLength);
- if (namespaceIdStr.contains("/")) {
- namespaceIdStr = namespaceIdStr.substring(0, namespaceIdStr.indexOf('/'));
- if (path
- .startsWith(namespacesPrefix + namespaceIdStr + Constants.ZNAMESPACE_CONF + "/")) {
- key = path.substring(
- (namespacesPrefix + namespaceIdStr + Constants.ZNAMESPACE_CONF + "/").length());
- }
- }
- }
-
- if (namespaceIdStr == null) {
- log.warn("Zookeeper told me about a path I was not watching: {}, event {}", path,
- toString(event));
- return;
- }
- }
- NamespaceId namespaceId = NamespaceId.of(namespaceIdStr);
-
- switch (event.getType()) {
- case NodeDataChanged:
- if (log.isTraceEnabled()) {
- log.trace("EventNodeDataChanged {}", event.getPath());
- }
- if (key != null) {
- context.getServerConfFactory().getNamespaceConfiguration(namespaceId)
- .propertyChanged(key);
- }
- break;
- case NodeChildrenChanged:
- context.getServerConfFactory().getNamespaceConfiguration(namespaceId).propertiesChanged();
- break;
- case NodeDeleted:
- if (key == null) {
- ServerConfigurationFactory.removeCachedNamespaceConfiguration(context.getInstanceID(),
- namespaceId);
- }
- break;
- case None:
- switch (event.getState()) {
- case Expired:
- log.info("Zookeeper node event type None, state=expired. Expire all table observers");
- ServerConfigurationFactory.expireAllTableObservers();
- break;
- case SyncConnected:
- break;
- case Disconnected:
- break;
- default:
- log.warn("EventNone event not handled {}", toString(event));
- }
- break;
- case NodeCreated:
- switch (event.getState()) {
- case SyncConnected:
- break;
- default:
- log.warn("Event NodeCreated event not handled {}", toString(event));
- }
- break;
- default:
- log.warn("Event not handled {}", toString(event));
- }
- }
-}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
index eef0248..025a8d8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
@@ -23,19 +23,14 @@ import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
-import org.apache.accumulo.core.conf.ObservableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.ZooCachePropertyAccessor.PropCacheKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class NamespaceConfiguration extends ObservableConfiguration {
- private static final Logger log = LoggerFactory.getLogger(NamespaceConfiguration.class);
+public class NamespaceConfiguration extends AccumuloConfiguration {
private static final Map<PropCacheKey,ZooCache> propCaches = new java.util.HashMap<>();
@@ -74,8 +69,7 @@ public class NamespaceConfiguration extends ObservableConfiguration {
PropCacheKey key = new PropCacheKey(context.getInstanceID(), namespaceId.canonical());
ZooCache propCache = propCaches.get(key);
if (propCache == null) {
- propCache = zcf.getZooCache(context.getZooKeepers(), context.getZooKeepersSessionTimeOut(),
- new NamespaceConfWatcher(context));
+ propCache = zcf.getZooCache(context.getZooKeepers(), context.getZooKeepersSessionTimeOut());
propCaches.put(key, propCache);
}
return propCache;
@@ -136,27 +130,6 @@ public class NamespaceConfiguration extends ObservableConfiguration {
return namespaceId;
}
- @Override
- public void addObserver(ConfigurationObserver co) {
- if (namespaceId == null) {
- String err = "Attempt to add observer for non-namespace configuration";
- log.error(err);
- throw new RuntimeException(err);
- }
- iterator();
- super.addObserver(co);
- }
-
- @Override
- public void removeObserver(ConfigurationObserver co) {
- if (namespaceId == null) {
- String err = "Attempt to remove observer for non-namespace configuration";
- log.error(err);
- throw new RuntimeException(err);
- }
- super.removeObserver(co);
- }
-
static boolean isIteratorOrConstraint(String key) {
return key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey())
|| key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey());
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
index 39ae6ec..f2c82f7 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/ServerConfigurationFactory.java
@@ -77,16 +77,6 @@ public class ServerConfigurationFactory extends ServerConfiguration {
}
}
- static void expireAllTableObservers() {
- synchronized (tableConfigs) {
- for (Map<TableId,TableConfiguration> instanceMap : tableConfigs.values()) {
- for (TableConfiguration c : instanceMap.values()) {
- c.expireAllObservers();
- }
- }
- }
- }
-
private final ServerContext context;
private final SiteConfiguration siteConfig;
private final String instanceID;
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
deleted file mode 100644
index ffcb7eb..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.conf;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.server.ServerContext;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-class TableConfWatcher implements Watcher {
-
- private static final Logger log = LoggerFactory.getLogger(TableConfWatcher.class);
- private final ServerContext context;
- private final String tablesPrefix;
- private ServerConfigurationFactory scf;
-
- TableConfWatcher(ServerContext context) {
- this.context = context;
- tablesPrefix = context.getZooKeeperRoot() + Constants.ZTABLES + "/";
- scf = context.getServerConfFactory();
- }
-
- static String toString(WatchedEvent event) {
- return new StringBuilder("{path=").append(event.getPath()).append(",state=")
- .append(event.getState()).append(",type=").append(event.getType()).append("}").toString();
- }
-
- @Override
- public void process(WatchedEvent event) {
- String path = event.getPath();
- if (log.isTraceEnabled()) {
- log.trace("WatchedEvent : {}", toString(event));
- }
-
- String tableIdString = null;
- String key = null;
-
- if (path != null) {
- if (path.startsWith(tablesPrefix)) {
- tableIdString = path.substring(tablesPrefix.length());
- if (tableIdString.contains("/")) {
- tableIdString = tableIdString.substring(0, tableIdString.indexOf('/'));
- if (path.startsWith(tablesPrefix + tableIdString + Constants.ZTABLE_CONF + "/")) {
- key = path
- .substring((tablesPrefix + tableIdString + Constants.ZTABLE_CONF + "/").length());
- }
- }
- }
-
- if (tableIdString == null) {
- log.warn("Zookeeper told me about a path I was not watching: {}, event {}", path,
- toString(event));
- return;
- }
- }
- TableId tableId = TableId.of(tableIdString);
-
- switch (event.getType()) {
- case NodeDataChanged:
- if (log.isTraceEnabled()) {
- log.trace("EventNodeDataChanged {}", event.getPath());
- }
- if (key != null) {
- scf.getTableConfiguration(tableId).propertyChanged(key);
- }
- break;
- case NodeChildrenChanged:
- scf.getTableConfiguration(tableId).propertiesChanged();
- break;
- case NodeDeleted:
- if (key == null) {
- // only remove the AccumuloConfiguration object when a
- // table node is deleted, not when a tables property is
- // deleted.
- ServerConfigurationFactory.removeCachedTableConfiguration(context.getInstanceID(),
- tableId);
- }
- break;
- case None:
- switch (event.getState()) {
- case Expired:
- log.info("Zookeeper node event type None, state=expired. Expire all table observers");
- ServerConfigurationFactory.expireAllTableObservers();
- break;
- case SyncConnected:
- break;
- case Disconnected:
- break;
- default:
- log.warn("EventNone event not handled {}", toString(event));
- }
- break;
- case NodeCreated:
- switch (event.getState()) {
- case SyncConnected:
- break;
- default:
- log.warn("Event NodeCreated event not handled {}", toString(event));
- }
- break;
- default:
- log.warn("Event not handled {}", toString(event));
- }
- }
-}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 28d7e25..a1cd54f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -28,9 +28,8 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.IterConfigUtil;
-import org.apache.accumulo.core.conf.ObservableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
@@ -42,15 +41,12 @@ import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.ServiceEnvironmentImpl;
import org.apache.accumulo.server.conf.ZooCachePropertyAccessor.PropCacheKey;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
-public class TableConfiguration extends ObservableConfiguration {
- private static final Logger log = LoggerFactory.getLogger(TableConfiguration.class);
+public class TableConfiguration extends AccumuloConfiguration {
private static final Map<PropCacheKey,ZooCache> propCaches = new java.util.HashMap<>();
@@ -62,7 +58,9 @@ public class TableConfiguration extends ObservableConfiguration {
private final TableId tableId;
- private EnumMap<IteratorScope,AtomicReference<ParsedIteratorConfig>> iteratorConfig;
+ private final EnumMap<IteratorScope,Deriver<ParsedIteratorConfig>> iteratorConfig;
+
+ private final Deriver<ScanDispatcher> scanDispatchDeriver;
public TableConfiguration(ServerContext context, TableId tableId, NamespaceConfiguration parent) {
this.context = requireNonNull(context);
@@ -71,8 +69,16 @@ public class TableConfiguration extends ObservableConfiguration {
iteratorConfig = new EnumMap<>(IteratorScope.class);
for (IteratorScope scope : IteratorScope.values()) {
- iteratorConfig.put(scope, new AtomicReference<>(null));
+ iteratorConfig.put(scope, newDeriver(conf -> {
+ Map<String,Map<String,String>> allOpts = new HashMap<>();
+ List<IterInfo> iters =
+ IterConfigUtil.parseIterConf(scope, Collections.emptyList(), allOpts, conf);
+ return new ParsedIteratorConfig(iters, allOpts, conf.get(Property.TABLE_CLASSPATH));
+
+ }));
}
+
+ scanDispatchDeriver = newDeriver(conf -> createScanDispatcher(conf, context, tableId));
}
void setZooCacheFactory(ZooCacheFactory zcf) {
@@ -84,8 +90,7 @@ public class TableConfiguration extends ObservableConfiguration {
PropCacheKey key = new PropCacheKey(context.getInstanceID(), tableId.canonical());
ZooCache propCache = propCaches.get(key);
if (propCache == null) {
- propCache = zcf.getZooCache(context.getZooKeepers(), context.getZooKeepersSessionTimeOut(),
- new TableConfWatcher(context));
+ propCache = zcf.getZooCache(context.getZooKeepers(), context.getZooKeepersSessionTimeOut());
propCaches.put(key, propCache);
}
return propCache;
@@ -102,27 +107,6 @@ public class TableConfiguration extends ObservableConfiguration {
.updateAndGet(pca -> pca == null ? new ZooCachePropertyAccessor(getZooCache()) : pca);
}
- @Override
- public void addObserver(ConfigurationObserver co) {
- if (tableId == null) {
- String err = "Attempt to add observer for non-table configuration";
- log.error(err);
- throw new RuntimeException(err);
- }
- iterator();
- super.addObserver(co);
- }
-
- @Override
- public void removeObserver(ConfigurationObserver co) {
- if (tableId == null) {
- String err = "Attempt to remove observer for non-table configuration";
- log.error(err);
- throw new RuntimeException(err);
- }
- super.removeObserver(co);
- }
-
private String getPath() {
return context.getZooKeeperRoot() + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF;
}
@@ -195,10 +179,9 @@ public class TableConfiguration extends ObservableConfiguration {
private final List<IterInfo> tableIters;
private final Map<String,Map<String,String>> tableOpts;
private final String context;
- private final long updateCount;
private ParsedIteratorConfig(List<IterInfo> ii, Map<String,Map<String,String>> opts,
- String context, long updateCount) {
+ String context) {
this.tableIters = ImmutableList.copyOf(ii);
Builder<String,Map<String,String>> imb = ImmutableMap.builder();
for (Entry<String,Map<String,String>> entry : opts.entrySet()) {
@@ -206,7 +189,6 @@ public class TableConfiguration extends ObservableConfiguration {
}
tableOpts = imb.build();
this.context = context;
- this.updateCount = updateCount;
}
public List<IterInfo> getIterInfo() {
@@ -223,71 +205,43 @@ public class TableConfiguration extends ObservableConfiguration {
}
public ParsedIteratorConfig getParsedIteratorConfig(IteratorScope scope) {
- long count = getUpdateCount();
- AtomicReference<ParsedIteratorConfig> ref = iteratorConfig.get(scope);
- ParsedIteratorConfig pic = ref.get();
- if (pic == null || pic.updateCount != count) {
- Map<String,Map<String,String>> allOpts = new HashMap<>();
- List<IterInfo> iters =
- IterConfigUtil.parseIterConf(scope, Collections.emptyList(), allOpts, this);
- ParsedIteratorConfig newPic =
- new ParsedIteratorConfig(iters, allOpts, get(Property.TABLE_CLASSPATH), count);
- ref.compareAndSet(pic, newPic);
- pic = newPic;
- }
-
- return pic;
+ return iteratorConfig.get(scope).derive();
}
- public static class TablesScanDispatcher {
- public final ScanDispatcher dispatcher;
- public final long count;
+ private static ScanDispatcher createScanDispatcher(AccumuloConfiguration conf,
+ ServerContext context, TableId tableId) {
+ ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(conf,
+ Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
- public TablesScanDispatcher(ScanDispatcher dispatcher, long count) {
- this.dispatcher = dispatcher;
- this.count = count;
- }
- }
+ Builder<String,String> builder = ImmutableMap.builder();
+ conf.getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
+ String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
+ builder.put(optKey, v);
+ });
- private AtomicReference<TablesScanDispatcher> scanDispatcherRef = new AtomicReference<>();
+ Map<String,String> opts = builder.build();
- public ScanDispatcher getScanDispatcher() {
- long count = getUpdateCount();
- TablesScanDispatcher currRef = scanDispatcherRef.get();
- if (currRef == null || currRef.count != count) {
- ScanDispatcher newDispatcher = Property.createTableInstanceFromPropertyName(this,
- Property.TABLE_SCAN_DISPATCHER, ScanDispatcher.class, null);
-
- Builder<String,String> builder = ImmutableMap.builder();
- getAllPropertiesWithPrefix(Property.TABLE_SCAN_DISPATCHER_OPTS).forEach((k, v) -> {
- String optKey = k.substring(Property.TABLE_SCAN_DISPATCHER_OPTS.getKey().length());
- builder.put(optKey, v);
- });
-
- Map<String,String> opts = builder.build();
-
- newDispatcher.init(new ScanDispatcher.InitParameters() {
- @Override
- public TableId getTableId() {
- return tableId;
- }
-
- @Override
- public Map<String,String> getOptions() {
- return opts;
- }
-
- @Override
- public ServiceEnvironment getServiceEnv() {
- return new ServiceEnvironmentImpl(context);
- }
- });
-
- TablesScanDispatcher newRef = new TablesScanDispatcher(newDispatcher, count);
- scanDispatcherRef.compareAndSet(currRef, newRef);
- currRef = newRef;
- }
+ newDispatcher.init(new ScanDispatcher.InitParameters() {
+ @Override
+ public TableId getTableId() {
+ return tableId;
+ }
+
+ @Override
+ public Map<String,String> getOptions() {
+ return opts;
+ }
+
+ @Override
+ public ServiceEnvironment getServiceEnv() {
+ return new ServiceEnvironmentImpl(context);
+ }
+ });
- return currRef.dispatcher;
+ return newDispatcher;
+ }
+
+ public ScanDispatcher getScanDispatcher() {
+ return scanDispatchDeriver.derive();
}
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
index 905ec71..f4e13e8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancer.java
@@ -31,10 +31,12 @@ import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.accumulo.core.client.admin.TableOperations;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.Deriver;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
@@ -43,7 +45,6 @@ import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletMigration;
import org.apache.commons.lang.builder.ToStringBuilder;
@@ -52,6 +53,9 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -82,7 +86,7 @@ import com.google.common.collect.Multimap;
* <b>table.custom.balancer.host.regex.max.outstanding.migrations</b>
*
*/
-public class HostRegexTableLoadBalancer extends TableLoadBalancer implements ConfigurationObserver {
+public class HostRegexTableLoadBalancer extends TableLoadBalancer {
private static final String PROP_PREFIX = Property.TABLE_ARBITRARY_PROP_PREFIX.getKey();
@@ -101,22 +105,83 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
public static final String HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY =
PROP_PREFIX + "balancer.host.regex.max.outstanding.migrations";
- protected long oobCheckMillis =
- ConfigurationTypeHelper.getTimeInMillis(HOST_BALANCER_OOB_DEFAULT);
+ private static Map<String,String> getRegexes(AccumuloConfiguration aconf) {
+ Map<String,String> regexes = new HashMap<>();
+ Map<String,String> customProps =
+ aconf.getAllPropertiesWithPrefix(Property.TABLE_ARBITRARY_PROP_PREFIX);
+
+ if (customProps != null && customProps.size() > 0) {
+ for (Entry<String,String> customProp : customProps.entrySet()) {
+ if (customProp.getKey().startsWith(HOST_BALANCER_PREFIX)) {
+ if (customProp.getKey().equals(HOST_BALANCER_OOB_CHECK_KEY)
+ || customProp.getKey().equals(HOST_BALANCER_REGEX_USING_IPS_KEY)
+ || customProp.getKey().equals(HOST_BALANCER_REGEX_MAX_MIGRATIONS_KEY)
+ || customProp.getKey().equals(HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY)) {
+ continue;
+ }
+ String tableName = customProp.getKey().substring(HOST_BALANCER_PREFIX.length());
+ String regex = customProp.getValue();
+ regexes.put(tableName, regex);
+ }
+ }
+ }
+
+ return ImmutableMap.copyOf(regexes);
+ }
+
+ /**
+ * Host Regex Table Load Balance Config
+ */
+ static class HrtlbConf {
+
+ protected long oobCheckMillis =
+ ConfigurationTypeHelper.getTimeInMillis(HOST_BALANCER_OOB_DEFAULT);
+ private int maxTServerMigrations = HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT;
+ private int maxOutstandingMigrations = DEFAULT_OUTSTANDING_MIGRATIONS;
+ private boolean isIpBasedRegex = false;
+ private Map<String,String> regexes;
+ private Map<String,Pattern> poolNameToRegexPattern = null;
+
+ HrtlbConf(AccumuloConfiguration aconf) {
+ System.out.println("building hrtlb conf");
+ String oobProperty = aconf.get(HOST_BALANCER_OOB_CHECK_KEY);
+ if (oobProperty != null) {
+ oobCheckMillis = ConfigurationTypeHelper.getTimeInMillis(oobProperty);
+ }
+ String ipBased = aconf.get(HOST_BALANCER_REGEX_USING_IPS_KEY);
+ if (ipBased != null) {
+ isIpBasedRegex = Boolean.parseBoolean(ipBased);
+ }
+ String migrations = aconf.get(HOST_BALANCER_REGEX_MAX_MIGRATIONS_KEY);
+ if (migrations != null) {
+ maxTServerMigrations = Integer.parseInt(migrations);
+ }
+ String outstanding = aconf.get(HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY);
+ if (outstanding != null) {
+ maxOutstandingMigrations = Integer.parseInt(outstanding);
+ }
+
+ this.regexes = getRegexes(aconf);
+
+ Map<String,Pattern> poolNameToRegexPatternBuilder = new HashMap<>();
+ regexes.forEach((k, v) -> {
+ poolNameToRegexPatternBuilder.put(k, Pattern.compile(v));
+ });
+
+ poolNameToRegexPattern = ImmutableMap.copyOf(poolNameToRegexPatternBuilder);
+ }
+ }
private static final long ONE_HOUR = 60 * 60 * 1000;
private static final Set<KeyExtent> EMPTY_MIGRATIONS = Collections.emptySet();
-
- private volatile Map<TableId,String> tableIdToTableName = null;
- private volatile Map<String,Pattern> poolNameToRegexPattern = null;
private volatile long lastOOBCheck = System.currentTimeMillis();
- private volatile boolean isIpBasedRegex = false;
private Map<String,SortedMap<TServerInstance,TabletServerStatus>> pools = new HashMap<>();
- private volatile int maxTServerMigrations = HOST_BALANCER_REGEX_MAX_MIGRATIONS_DEFAULT;
- private volatile int maxOutstandingMigrations = DEFAULT_OUTSTANDING_MIGRATIONS;
private final Map<KeyExtent,TabletMigration> migrationsFromLastPass = new HashMap<>();
private final Map<String,Long> tableToTimeSinceNoMigrations = new HashMap<>();
+ private Deriver<HrtlbConf> hrtlbConf;
+ private LoadingCache<TableId,Deriver<Map<String,String>>> tablesRegExCache;
+
/**
* Group the set of current tservers by pool name. Tservers that don't match a regex are put into
* a default pool. This could be expensive in the terms of the amount of time to recompute the
@@ -170,7 +235,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
*/
protected List<String> getPoolNamesForHost(String host) {
String test = host;
- if (!isIpBasedRegex) {
+ if (!hrtlbConf.derive().isIpBasedRegex) {
try {
test = getNameFromIp(host);
} catch (UnknownHostException e1) {
@@ -180,7 +245,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
}
}
List<String> pools = new ArrayList<>();
- for (Entry<String,Pattern> e : poolNameToRegexPattern.entrySet()) {
+ for (Entry<String,Pattern> e : hrtlbConf.derive().poolNameToRegexPattern.entrySet()) {
if (e.getValue().matcher(test).matches()) {
pools.add(e.getKey());
}
@@ -195,6 +260,24 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
return InetAddress.getByName(hostIp).getHostName();
}
+ private void checkTableConfig(TableId tableId) {
+ Map<String,String> tableRegexes = tablesRegExCache.getUnchecked(tableId).derive();
+
+ if (!hrtlbConf.derive().regexes.equals(tableRegexes)) {
+ LoggerFactory.getLogger(HostRegexTableLoadBalancer.class).warn(
+ "Table id {} has different config than system. The per table config is ignored.",
+ tableId);
+ }
+ }
+
+ private Map<TableId,String> createdTableNameMap(Map<String,String> tableIdMap) {
+ HashMap<TableId,String> tableNameMap = new HashMap<>();
+ tableIdMap.forEach((tableName, tableId) -> {
+ tableNameMap.put(TableId.of(tableId), tableName);
+ });
+ return tableNameMap;
+ }
+
/**
* Matches table name against pool names, returns matching pool name or DEFAULT_POOL.
*
@@ -206,106 +289,58 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
if (tableName == null) {
return DEFAULT_POOL;
}
- return poolNameToRegexPattern.containsKey(tableName) ? tableName : DEFAULT_POOL;
- }
-
- /**
- * Parse configuration and extract properties
- *
- * @param conf
- * server configuration
- */
- protected void parseConfiguration(ServerConfiguration conf) {
- TableOperations t = getTableOperations();
- if (t == null) {
- throw new RuntimeException("Table Operations cannot be null");
- }
- Map<TableId,String> tableIdToTableNameBuilder = new HashMap<>();
- Map<String,Pattern> poolNameToRegexPatternBuilder = new HashMap<>();
- for (Entry<String,String> table : t.tableIdMap().entrySet()) {
- TableId tableId = TableId.of(table.getValue());
- tableIdToTableNameBuilder.put(tableId, table.getKey());
- conf.getTableConfiguration(tableId).addObserver(this);
- Map<String,String> customProps = conf.getTableConfiguration(tableId)
- .getAllPropertiesWithPrefix(Property.TABLE_ARBITRARY_PROP_PREFIX);
- if (customProps != null && customProps.size() > 0) {
- for (Entry<String,String> customProp : customProps.entrySet()) {
- if (customProp.getKey().startsWith(HOST_BALANCER_PREFIX)) {
- if (customProp.getKey().equals(HOST_BALANCER_OOB_CHECK_KEY)
- || customProp.getKey().equals(HOST_BALANCER_REGEX_USING_IPS_KEY)
- || customProp.getKey().equals(HOST_BALANCER_REGEX_MAX_MIGRATIONS_KEY)
- || customProp.getKey().equals(HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY)) {
- continue;
- }
- String tableName = customProp.getKey().substring(HOST_BALANCER_PREFIX.length());
- String regex = customProp.getValue();
- poolNameToRegexPatternBuilder.put(tableName, Pattern.compile(regex));
- }
- }
- }
- }
-
- tableIdToTableName = ImmutableMap.copyOf(tableIdToTableNameBuilder);
- poolNameToRegexPattern = ImmutableMap.copyOf(poolNameToRegexPatternBuilder);
-
- String oobProperty = conf.getSystemConfiguration().get(HOST_BALANCER_OOB_CHECK_KEY);
- if (oobProperty != null) {
- oobCheckMillis = ConfigurationTypeHelper.getTimeInMillis(oobProperty);
- }
- String ipBased = conf.getSystemConfiguration().get(HOST_BALANCER_REGEX_USING_IPS_KEY);
- if (ipBased != null) {
- isIpBasedRegex = Boolean.parseBoolean(ipBased);
- }
- String migrations = conf.getSystemConfiguration().get(HOST_BALANCER_REGEX_MAX_MIGRATIONS_KEY);
- if (migrations != null) {
- maxTServerMigrations = Integer.parseInt(migrations);
- }
- String outstanding =
- conf.getSystemConfiguration().get(HOST_BALANCER_OUTSTANDING_MIGRATIONS_KEY);
- if (outstanding != null) {
- this.maxOutstandingMigrations = Integer.parseInt(outstanding);
- }
- LOG.info("{}", this);
+ return hrtlbConf.derive().poolNameToRegexPattern.containsKey(tableName) ? tableName
+ : DEFAULT_POOL;
}
@Override
public String toString() {
+ HrtlbConf myConf = hrtlbConf.derive();
ToStringBuilder buf = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE);
- buf.append("\nTablet Out Of Bounds Check Interval", this.oobCheckMillis);
- buf.append("\nMax Tablet Server Migrations", this.maxTServerMigrations);
- buf.append("\nRegular Expressions use IPs", this.isIpBasedRegex);
- buf.append("\nPools", this.poolNameToRegexPattern);
+ buf.append("\nTablet Out Of Bounds Check Interval", myConf.oobCheckMillis);
+ buf.append("\nMax Tablet Server Migrations", myConf.maxTServerMigrations);
+ buf.append("\nRegular Expressions use IPs", myConf.isIpBasedRegex);
+ buf.append("\nPools", myConf.poolNameToRegexPattern);
return buf.toString();
}
- public Map<TableId,String> getTableIdToTableName() {
- return tableIdToTableName;
- }
-
public Map<String,Pattern> getPoolNameToRegexPattern() {
- return poolNameToRegexPattern;
+ return hrtlbConf.derive().poolNameToRegexPattern;
}
public int getMaxMigrations() {
- return maxTServerMigrations;
+ return hrtlbConf.derive().maxTServerMigrations;
}
public int getMaxOutstandingMigrations() {
- return maxOutstandingMigrations;
+ return hrtlbConf.derive().maxOutstandingMigrations;
}
public long getOobCheckMillis() {
- return oobCheckMillis;
+ return hrtlbConf.derive().oobCheckMillis;
}
public boolean isIpBasedRegex() {
- return isIpBasedRegex;
+ return hrtlbConf.derive().isIpBasedRegex;
}
@Override
public void init(ServerContext context) {
super.init(context);
- parseConfiguration(context.getServerConfFactory());
+
+ this.hrtlbConf =
+ context.getServerConfFactory().getSystemConfiguration().newDeriver(HrtlbConf::new);
+
+ tablesRegExCache = CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS)
+ .build(new CacheLoader<TableId,Deriver<Map<String,String>>>() {
+ @Override
+ public Deriver<Map<String,String>> load(TableId key) throws Exception {
+ return context.getServerConfFactory().getTableConfiguration(key)
+ .newDeriver(conf -> getRegexes(conf));
+ }
+ });
+
+ LOG.info("{}", this);
}
@Override
@@ -324,6 +359,9 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
}
tableUnassigned.put(e.getKey(), e.getValue());
}
+
+ Map<TableId,String> tableIdToTableName = createdTableNameMap(getTableOperations().tableIdMap());
+
// Send a view of the current servers to the tables tablet balancer
for (Entry<TableId,Map<KeyExtent,TServerInstance>> e : groupedUnassigned.entrySet()) {
Map<KeyExtent,TServerInstance> newAssignments = new HashMap<>();
@@ -357,14 +395,19 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
return minBalanceTime;
Map<String,String> tableIdMap = t.tableIdMap();
+ Map<TableId,String> tableIdToTableName = createdTableNameMap(tableIdMap);
+ tableIdToTableName.keySet().forEach(tid -> checkTableConfig(tid));
+
long now = System.currentTimeMillis();
+ HrtlbConf myConf = hrtlbConf.derive();
+
Map<String,SortedMap<TServerInstance,TabletServerStatus>> currentGrouped =
splitCurrentByRegex(current);
- if ((now - this.lastOOBCheck) > this.oobCheckMillis) {
+ if ((now - this.lastOOBCheck) > myConf.oobCheckMillis) {
try {
// Check to see if a tablet is assigned outside the bounds of the pool. If so, migrate it.
- for (String table : t.list()) {
+ for (String table : tableIdMap.keySet()) {
LOG.debug("Checking for out of bounds tablets for table {}", table);
String tablePoolName = getPoolNameForTable(table);
for (Entry<TServerInstance,TabletServerStatus> e : current.entrySet()) {
@@ -406,7 +449,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
LOG.info("Tablet {} is currently outside the bounds of the"
+ " regex, migrating from {} to {}", ke, e.getKey(), nextTS);
migrationsOut.add(new TabletMigration(ke, e.getKey(), nextTS));
- if (migrationsOut.size() >= this.maxTServerMigrations) {
+ if (migrationsOut.size() >= myConf.maxTServerMigrations) {
break;
}
} else {
@@ -433,7 +476,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
}
if (migrations != null && migrations.size() > 0) {
- if (migrations.size() >= maxOutstandingMigrations) {
+ if (migrations.size() >= myConf.maxOutstandingMigrations) {
LOG.warn("Not balancing tables due to {} outstanding migrations", migrations.size());
if (LOG.isTraceEnabled()) {
LOG.trace("Sample up to 10 outstanding migrations: {}", Iterables.limit(migrations, 10));
@@ -490,7 +533,7 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
}
migrationsOut.addAll(newMigrations);
- if (migrationsOut.size() >= this.maxTServerMigrations) {
+ if (migrationsOut.size() >= myConf.maxTServerMigrations) {
break;
}
}
@@ -530,18 +573,4 @@ public class HostRegexTableLoadBalancer extends TableLoadBalancer implements Con
}
return newInfo;
}
-
- @Override
- public void propertyChanged(String key) {
- parseConfiguration(context.getServerConfFactory());
- }
-
- @Override
- public void propertiesChanged() {
- parseConfiguration(context.getServerConfFactory());
- }
-
- @Override
- public void sessionExpired() {}
-
}
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
index 7f023e7..1b53027 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/NamespaceConfigurationTest.java
@@ -17,7 +17,6 @@
package org.apache.accumulo.server.conf;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
@@ -25,9 +24,7 @@ import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -37,7 +34,6 @@ import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -78,8 +74,7 @@ public class NamespaceConfigurationTest {
c.setZooCacheFactory(zcf);
zc = createMock(ZooCache.class);
- expect(zcf.getZooCache(eq(ZOOKEEPERS), eq(ZK_SESSION_TIMEOUT),
- anyObject(NamespaceConfWatcher.class))).andReturn(zc);
+ expect(zcf.getZooCache(eq(ZOOKEEPERS), eq(ZK_SESSION_TIMEOUT))).andReturn(zc);
replay(zcf);
}
@@ -144,18 +139,6 @@ public class NamespaceConfigurationTest {
}
@Test
- public void testObserver() {
- ConfigurationObserver o = createMock(ConfigurationObserver.class);
- c.addObserver(o);
- Collection<ConfigurationObserver> os = c.getObservers();
- assertEquals(1, os.size());
- assertTrue(os.contains(o));
- c.removeObserver(o);
- os = c.getObservers();
- assertEquals(0, os.size());
- }
-
- @Test
public void testInvalidateCache() {
// need to do a get so the accessor is created
Property p = Property.INSTANCE_SECRET;
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/ServerConfigurationFactoryTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/ServerConfigurationFactoryTest.java
index 8f29544..bf9dbb9 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/ServerConfigurationFactoryTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/ServerConfigurationFactoryTest.java
@@ -37,6 +37,7 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.server.ServerContext;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -56,8 +57,7 @@ public class ServerConfigurationFactoryTest {
public static void setUpClass() {
zcf = createMock(ZooCacheFactory.class);
zc = createMock(ZooCache.class);
- expect(zcf.getZooCache(eq(ZK_HOST), eq(ZK_TIMEOUT), anyObject(NamespaceConfWatcher.class)))
- .andReturn(zc);
+ expect(zcf.getZooCache(eq(ZK_HOST), eq(ZK_TIMEOUT), EasyMock.anyObject())).andReturn(zc);
expectLastCall().anyTimes();
expect(zcf.getZooCache(ZK_HOST, ZK_TIMEOUT)).andReturn(zc);
expectLastCall().anyTimes();
diff --git a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
index af932aa..aaa0a95 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/conf/TableConfigurationTest.java
@@ -17,16 +17,13 @@
package org.apache.accumulo.server.conf;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.eq;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -34,7 +31,6 @@ import java.util.UUID;
import java.util.function.Predicate;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -75,9 +71,7 @@ public class TableConfigurationTest {
c.setZooCacheFactory(zcf);
zc = createMock(ZooCache.class);
- expect(
- zcf.getZooCache(eq(ZOOKEEPERS), eq(ZK_SESSION_TIMEOUT), anyObject(TableConfWatcher.class)))
- .andReturn(zc);
+ expect(zcf.getZooCache(eq(ZOOKEEPERS), eq(ZK_SESSION_TIMEOUT))).andReturn(zc);
replay(zcf);
}
@@ -132,18 +126,6 @@ public class TableConfigurationTest {
}
@Test
- public void testObserver() {
- ConfigurationObserver o = createMock(ConfigurationObserver.class);
- c.addObserver(o);
- Collection<ConfigurationObserver> os = c.getObservers();
- assertEquals(1, os.size());
- assertTrue(os.contains(o));
- c.removeObserver(o);
- os = c.getObservers();
- assertEquals(0, os.size());
- }
-
- @Test
public void testInvalidateCache() {
// need to do a get so the accessor is created
Property p = Property.INSTANCE_SECRET;
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
index f53cd62..e7944a0 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/BaseHostRegexTableLoadBalancerTest.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Properties;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -94,15 +93,17 @@ public abstract class BaseHostRegexTableLoadBalancerTest extends HostRegexTableL
protected static class TestServerConfigurationFactory extends ServerConfigurationFactory {
final ServerContext context;
+ private ConfigurationCopy config;
public TestServerConfigurationFactory(ServerContext context) {
super(context, siteConfg);
this.context = context;
+ this.config = new ConfigurationCopy(DEFAULT_TABLE_PROPERTIES);
}
@Override
public synchronized AccumuloConfiguration getSystemConfiguration() {
- return new ConfigurationCopy(DEFAULT_TABLE_PROPERTIES);
+ return config;
}
@Override
@@ -114,16 +115,12 @@ public abstract class BaseHostRegexTableLoadBalancerTest extends HostRegexTableL
return new TableConfiguration(context, tableId, dummyConf) {
@Override
public String get(Property property) {
- return DEFAULT_TABLE_PROPERTIES.get(property.name());
+ return getSystemConfiguration().get(property.name());
}
@Override
public void getProperties(Map<String,String> props, Predicate<String> filter) {
- for (Entry<String,String> e : DEFAULT_TABLE_PROPERTIES.entrySet()) {
- if (filter.test(e.getKey())) {
- props.put(e.getKey(), e.getValue());
- }
- }
+ getSystemConfiguration().getProperties(props, filter);
}
@Override
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerReconfigurationTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
index 40a28c1..105b6d5 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerReconfigurationTest.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
@@ -89,9 +90,10 @@ public class HostRegexTableLoadBalancerReconfigurationTest
this.balance(Collections.unmodifiableSortedMap(allTabletServers), migrations, migrationsOut);
assertEquals(0, migrationsOut.size());
// Change property, simulate call by TableConfWatcher
- DEFAULT_TABLE_PROPERTIES
- .put(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(), "r01.*");
- this.propertiesChanged();
+
+ ((ConfigurationCopy) factory.getSystemConfiguration())
+ .set(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(), "r01.*");
+
// Wait to trigger the out of bounds check and the repool check
UtilWaitThread.sleep(10000);
this.balance(Collections.unmodifiableSortedMap(allTabletServers), migrations, migrationsOut);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
index ab9cad7..7d3dd41 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/master/balancer/HostRegexTableLoadBalancerTest.java
@@ -33,14 +33,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedMap;
-import java.util.function.Predicate;
import java.util.regex.Pattern;
-import org.apache.accumulo.core.clientImpl.Namespace;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.DefaultConfiguration;
-import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
@@ -48,9 +44,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.server.ServerContext;
-import org.apache.accumulo.server.conf.NamespaceConfiguration;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
-import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletMigration;
import org.junit.Test;
@@ -84,15 +78,6 @@ public class HostRegexTableLoadBalancerTest extends BaseHostRegexTableLoadBalanc
assertEquals(Pattern.compile("r01.*").pattern(), patterns.get(FOO.getTableName()).pattern());
assertTrue(patterns.containsKey(BAR.getTableName()));
assertEquals(Pattern.compile("r02.*").pattern(), patterns.get(BAR.getTableName()).pattern());
- Map<TableId,String> tids = this.getTableIdToTableName();
- assertEquals(3, tids.size());
- assertTrue(tids.containsKey(FOO.getId()));
- assertEquals(FOO.getTableName(), tids.get(FOO.getId()));
- assertTrue(tids.containsKey(BAR.getId()));
- assertEquals(BAR.getTableName(), tids.get(BAR.getId()));
- assertTrue(tids.containsKey(BAZ.getId()));
- assertEquals(BAZ.getTableName(), tids.get(BAZ.getId()));
- assertFalse(this.isIpBasedRegex());
}
@Test
@@ -193,40 +178,13 @@ public class HostRegexTableLoadBalancerTest extends BaseHostRegexTableLoadBalanc
ServerContext context = createMockContext();
replay(context);
initFactory(new TestServerConfigurationFactory(context) {
-
@Override
- public TableConfiguration getTableConfiguration(TableId tableId) {
- NamespaceConfiguration defaultConf = new NamespaceConfiguration(Namespace.DEFAULT.id(),
- this.context, DefaultConfiguration.getInstance());
- return new TableConfiguration(this.context, tableId, defaultConf) {
- HashMap<String,String> tableProperties = new HashMap<>();
- {
- tableProperties
- .put(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + FOO.getTableName(), "r.*");
- tableProperties.put(
- HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(),
- "r01.*|r02.*");
- }
-
- @Override
- public String get(Property property) {
- return tableProperties.get(property.name());
- }
-
- @Override
- public void getProperties(Map<String,String> props, Predicate<String> filter) {
- for (Entry<String,String> e : tableProperties.entrySet()) {
- if (filter.test(e.getKey())) {
- props.put(e.getKey(), e.getValue());
- }
- }
- }
-
- @Override
- public long getUpdateCount() {
- return 0;
- }
- };
+ public synchronized AccumuloConfiguration getSystemConfiguration() {
+ HashMap<String,String> props = new HashMap<>(DEFAULT_TABLE_PROPERTIES);
+ props.put(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + FOO.getTableName(), "r.*");
+ props.put(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(),
+ "r01.*|r02.*");
+ return new ConfigurationCopy(props);
}
});
Map<String,SortedMap<TServerInstance,TabletServerStatus>> groups =
@@ -281,44 +239,12 @@ public class HostRegexTableLoadBalancerTest extends BaseHostRegexTableLoadBalanc
HashMap<String,String> props = new HashMap<>();
props.put(HostRegexTableLoadBalancer.HOST_BALANCER_OOB_CHECK_KEY, "30s");
props.put(HostRegexTableLoadBalancer.HOST_BALANCER_REGEX_USING_IPS_KEY, "true");
+ props.put(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + FOO.getTableName(),
+ "192\\.168\\.0\\.[1-5]");
+ props.put(HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(),
+ "192\\.168\\.0\\.[6-9]|192\\.168\\.0\\.10");
return new ConfigurationCopy(props);
}
-
- @Override
- public TableConfiguration getTableConfiguration(TableId tableId) {
- NamespaceConfiguration defaultConf = new NamespaceConfiguration(Namespace.DEFAULT.id(),
- this.context, DefaultConfiguration.getInstance());
- return new TableConfiguration(context, tableId, defaultConf) {
- HashMap<String,String> tableProperties = new HashMap<>();
- {
- tableProperties.put(
- HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + FOO.getTableName(),
- "192\\.168\\.0\\.[1-5]");
- tableProperties.put(
- HostRegexTableLoadBalancer.HOST_BALANCER_PREFIX + BAR.getTableName(),
- "192\\.168\\.0\\.[6-9]|192\\.168\\.0\\.10");
- }
-
- @Override
- public String get(Property property) {
- return tableProperties.get(property.name());
- }
-
- @Override
- public void getProperties(Map<String,String> props, Predicate<String> filter) {
- for (Entry<String,String> e : tableProperties.entrySet()) {
- if (filter.test(e.getKey())) {
- props.put(e.getKey(), e.getValue());
- }
- }
- }
-
- @Override
- public long getUpdateCount() {
- return 0;
- }
- };
- }
});
assertTrue(isIpBasedRegex());
Map<String,SortedMap<TServerInstance,TabletServerStatus>> groups =
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 557c53c..db9623c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3145,14 +3145,6 @@ public class TabletServer extends AbstractServer {
Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
SimpleTimer.getInstance(aconf).schedule(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS);
-
- Runnable constraintTask = () -> {
- for (Tablet tablet : getOnlineTablets().values()) {
- tablet.checkConstraints();
- }
- };
-
- SimpleTimer.getInstance(aconf).schedule(constraintTask, 0, 1000);
}
public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
index a375d12..c7762b4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/constraints/ConstraintChecker.java
@@ -20,8 +20,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Constraint;
import org.apache.accumulo.core.constraints.Constraint.Environment;
@@ -41,43 +41,39 @@ public class ConstraintChecker {
private ArrayList<Constraint> constrains;
private static final Logger log = LoggerFactory.getLogger(ConstraintChecker.class);
- private ClassLoader loader;
- private TableConfiguration conf;
-
- private AtomicLong lastCheck = new AtomicLong(0);
-
- public ConstraintChecker(TableConfiguration conf) {
+ public ConstraintChecker(AccumuloConfiguration conf) {
constrains = new ArrayList<>();
- this.conf = conf;
-
try {
String context = conf.get(Property.TABLE_CLASSPATH);
+ ClassLoader loader;
+
if (context != null && !context.equals("")) {
loader = AccumuloVFSClassLoader.getContextManager().getClassLoader(context);
} else {
loader = AccumuloVFSClassLoader.getClassLoader();
}
- for (Entry<String,String> entry : conf) {
+ for (Entry<String,String> entry : conf
+ .getAllPropertiesWithPrefix(Property.TABLE_CONSTRAINT_PREFIX).entrySet()) {
if (entry.getKey().startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) {
String className = entry.getValue();
Class<? extends Constraint> clazz =
loader.loadClass(className).asSubclass(Constraint.class);
- log.debug("Loaded constraint {} for {}", clazz.getName(), conf.getTableId());
+
+ log.debug("Loaded constraint {} for {}", clazz.getName(),
+ ((TableConfiguration) conf).getTableId());
constrains.add(clazz.getDeclaredConstructor().newInstance());
}
}
- lastCheck.set(System.currentTimeMillis());
-
} catch (Throwable e) {
constrains.clear();
- loader = null;
constrains.add(new UnsatisfiableConstraint((short) -1,
"Failed to load constraints, not accepting mutations."));
- log.error("Failed to load constraints " + conf.getTableId() + " " + e, e);
+ log.error("Failed to load constraints " + ((TableConfiguration) conf).getTableId() + " " + e,
+ e);
}
}
@@ -86,29 +82,6 @@ public class ConstraintChecker {
return constrains;
}
- public boolean classLoaderChanged() {
-
- if (constrains.size() == 0)
- return false;
-
- try {
- String context = conf.get(Property.TABLE_CLASSPATH);
-
- ClassLoader currentLoader;
-
- if (context != null && !context.equals("")) {
- currentLoader = AccumuloVFSClassLoader.getContextManager().getClassLoader(context);
- } else {
- currentLoader = AccumuloVFSClassLoader.getClassLoader();
- }
-
- return currentLoader != loader;
- } catch (Exception e) {
- log.debug("Failed to check {}", e.getMessage());
- return true;
- }
- }
-
private static Violations addViolation(Violations violations, ConstraintViolationSummary cvs) {
if (violations == null) {
violations = new Violations();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 1f17cef..68eb79f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -42,7 +42,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.accumulo.core.Constants;
@@ -53,8 +52,8 @@ import org.apache.accumulo.core.client.sample.SamplerConfiguration;
import org.apache.accumulo.core.clientImpl.DurabilityImpl;
import org.apache.accumulo.core.clientImpl.Tables;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.AccumuloConfiguration.Deriver;
import org.apache.accumulo.core.conf.ConfigurationCopy;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.constraints.Violations;
import org.apache.accumulo.core.data.ByteSequence;
@@ -151,7 +150,6 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
@@ -166,6 +164,8 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
public class Tablet {
private static final Logger log = LoggerFactory.getLogger(Tablet.class);
+ private static final byte[] EMPTY_BYTES = new byte[0];
+
private final TabletServer tabletServer;
private final ServerContext context;
private final KeyExtent extent;
@@ -222,7 +222,7 @@ public class Tablet {
private final Set<MajorCompactionReason> majorCompactionQueued =
Collections.synchronizedSet(EnumSet.noneOf(MajorCompactionReason.class));
- private final AtomicReference<ConstraintChecker> constraintChecker = new AtomicReference<>();
+ private final Deriver<ConstraintChecker> constraintChecker;
private int writesInProgress = 0;
@@ -240,7 +240,7 @@ public class Tablet {
private final Rate ingestByteRate = new Rate(0.95);
private long ingestBytes = 0;
- private byte[] defaultSecurityLabel = new byte[0];
+ private final Deriver<byte[]> defaultSecurityLabel;
private long lastMinorCompactionFinishTime = 0;
private long lastMapFileImportTime = 0;
@@ -251,8 +251,6 @@ public class Tablet {
private final Rate scannedRate = new Rate(0.95);
private final AtomicLong scannedCount = new AtomicLong(0);
- private final ConfigurationObserver configObserver;
-
// Files that are currently in the process of bulk importing. Access to this is protected by the
// tablet lock.
private final Set<FileRef> bulkImporting = new HashSet<>();
@@ -302,30 +300,6 @@ public class Tablet {
}
}
- /**
- * Only visible for testing
- */
- @VisibleForTesting
- protected Tablet(TabletTime tabletTime, String tabletDirectory, int logId, Path location,
- DatafileManager datafileManager, TabletServer tabletServer,
- TabletResourceManager tabletResources, TabletMemory tabletMemory,
- TableConfiguration tableConfiguration, KeyExtent extent,
- ConfigurationObserver configObserver) {
- this.tabletTime = tabletTime;
- this.tabletDirectory = tabletDirectory;
- this.logId = logId;
- this.location = location;
- this.datafileManager = datafileManager;
- this.tabletServer = tabletServer;
- this.context = tabletServer.getContext();
- this.tabletResources = tabletResources;
- this.tabletMemory = tabletMemory;
- this.tableConfiguration = tableConfiguration;
- this.extent = extent;
- this.configObserver = configObserver;
- this.splitCreationTime = 0;
- }
-
public Tablet(final TabletServer tabletServer, final KeyExtent extent,
final TabletResourceManager trm, TabletData data) throws IOException {
@@ -372,56 +346,23 @@ public class Tablet {
for (Entry<Long,List<FileRef>> entry : data.getBulkImported().entrySet()) {
this.bulkImported.put(entry.getKey(), new CopyOnWriteArrayList<>(entry.getValue()));
}
- setupDefaultSecurityLabels(extent);
final List<LogEntry> logEntries = tabletPaths.logEntries;
final SortedMap<FileRef,DataFileValue> datafiles = tabletPaths.datafiles;
- tableConfiguration.addObserver(configObserver = new ConfigurationObserver() {
-
- private void reloadConstraints() {
- log.debug("Reloading constraints for extent: " + extent);
- constraintChecker.set(new ConstraintChecker(tableConfiguration));
- }
-
- @Override
- public void propertiesChanged() {
- reloadConstraints();
-
- try {
- setupDefaultSecurityLabels(extent);
- } catch (Exception e) {
- log.error("Failed to reload default security labels for extent: {}", extent);
- }
- }
-
- @Override
- public void propertyChanged(String prop) {
- if (prop.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey())) {
- reloadConstraints();
- } else if (prop.equals(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey())) {
- try {
- log.info("Default security labels changed for extent: {}", extent);
- setupDefaultSecurityLabels(extent);
- } catch (Exception e) {
- log.error("Failed to reload default security labels for extent: {}", extent);
- }
- }
-
- }
-
- @Override
- public void sessionExpired() {
- log.trace("Session expired, no longer updating per table props...");
- }
+ constraintChecker = tableConfiguration.newDeriver(conf -> new ConstraintChecker(conf));
- });
+ if (extent.isMeta()) {
+ defaultSecurityLabel = () -> EMPTY_BYTES;
+ } else {
+ defaultSecurityLabel = tableConfiguration.newDeriver(conf -> {
+ return new ColumnVisibility(conf.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY))
+ .getExpression();
+ });
+ }
- tableConfiguration.getNamespaceConfiguration().addObserver(configObserver);
tabletMemory = new TabletMemory(this);
- // Force a load of any per-table properties
- configObserver.propertiesChanged();
if (!logEntries.isEmpty()) {
log.info("Starting Write-Ahead Log recovery for {}", this.extent);
final AtomicLong entriesUsedOnTablet = new AtomicLong(0);
@@ -556,21 +497,6 @@ public class Tablet {
}
}
- private void setupDefaultSecurityLabels(KeyExtent extent) {
- if (extent.isMeta()) {
- defaultSecurityLabel = new byte[0];
- } else {
- try {
- ColumnVisibility cv = new ColumnVisibility(
- tableConfiguration.get(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY));
- this.defaultSecurityLabel = cv.getExpression();
- } catch (Exception e) {
- log.error("Error setting up default security label {}", e.getMessage(), e);
- this.defaultSecurityLabel = new byte[0];
- }
- }
- }
-
private LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
HashSet<Column> columnSet, List<KVEntry> results, long maxResultsSize, long batchTimeOut)
throws IOException {
@@ -722,7 +648,7 @@ public class Tablet {
AtomicBoolean iFlag) throws IOException {
ScanDataSource dataSource =
- new ScanDataSource(this, authorizations, this.defaultSecurityLabel, iFlag);
+ new ScanDataSource(this, authorizations, this.defaultSecurityLabel.derive(), iFlag);
try {
SortedKeyValueIterator<Key,Value> iter = new SourceSwitchingIterator(dataSource);
@@ -759,8 +685,9 @@ public class Tablet {
tabletRange.clip(range);
}
- ScanDataSource dataSource = new ScanDataSource(this, authorizations, this.defaultSecurityLabel,
- columns, ssiList, ssio, interruptFlag, samplerConfig, batchTimeOut, classLoaderContext);
+ ScanDataSource dataSource =
+ new ScanDataSource(this, authorizations, this.defaultSecurityLabel.derive(), columns,
+ ssiList, ssio, interruptFlag, samplerConfig, batchTimeOut, classLoaderContext);
LookupResult result = null;
@@ -894,8 +821,9 @@ public class Tablet {
// then clip will throw an exception
extent.toDataRange().clip(range);
- ScanOptions opts = new ScanOptions(num, authorizations, this.defaultSecurityLabel, columns,
- ssiList, ssio, interruptFlag, isolated, samplerConfig, batchTimeOut, classLoaderContext);
+ ScanOptions opts =
+ new ScanOptions(num, authorizations, this.defaultSecurityLabel.derive(), columns, ssiList,
+ ssio, interruptFlag, isolated, samplerConfig, batchTimeOut, classLoaderContext);
return new Scanner(this, range, opts);
}
@@ -1201,19 +1129,10 @@ public class Tablet {
return commitSession;
}
- public void checkConstraints() {
- ConstraintChecker cc = constraintChecker.get();
-
- if (cc.classLoaderChanged()) {
- ConstraintChecker ncc = new ConstraintChecker(tableConfiguration);
- constraintChecker.compareAndSet(cc, ncc);
- }
- }
-
public CommitSession prepareMutationsForCommit(TservConstraintEnv cenv, List<Mutation> mutations)
throws TConstraintViolationException {
- ConstraintChecker cc = constraintChecker.get();
+ ConstraintChecker cc = constraintChecker.derive();
List<Mutation> violators = null;
Violations violations = new Violations();
@@ -1483,9 +1402,6 @@ public class Tablet {
log.debug("TABLET_HIST {} closed", extent);
- tableConfiguration.getNamespaceConfiguration().removeObserver(configObserver);
- tableConfiguration.removeObserver(configObserver);
-
if (completeClose) {
closeState = CloseState.COMPLETE;
}
@@ -2005,7 +1921,7 @@ public class Tablet {
getNextMapFilename((filesToCompact.size() == 0 && !propogateDeletes) ? "A" : "C");
FileRef compactTmpName = new FileRef(fileName.path() + "_tmp");
- AccumuloConfiguration tableConf = createTableConfiguration(tableConfiguration, plan);
+ AccumuloConfiguration tableConf = createCompactionConfiguration(tableConfiguration, plan);
try (TraceScope span = Trace.startSpan("compactFiles")) {
CompactionEnv cenv = new CompactionEnv() {
@@ -2082,7 +1998,7 @@ public class Tablet {
}
}
- protected AccumuloConfiguration createTableConfiguration(TableConfiguration base,
+ protected static AccumuloConfiguration createCompactionConfiguration(TableConfiguration base,
CompactionPlan plan) {
if (plan == null || plan.writeParameters == null) {
return base;
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/TabletTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/TabletTest.java
index ea9c9ff..2adc9a1 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/TabletTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/TabletTest.java
@@ -21,16 +21,10 @@ import static org.junit.Assert.assertEquals;
import java.util.Collections;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.server.conf.TableConfiguration;
-import org.apache.accumulo.server.tablets.TabletTime;
-import org.apache.accumulo.tserver.TabletServer;
-import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager;
import org.apache.accumulo.tserver.compaction.CompactionPlan;
import org.apache.accumulo.tserver.compaction.WriteParameters;
-import org.apache.hadoop.fs.Path;
import org.easymock.EasyMock;
import org.junit.Test;
@@ -42,16 +36,6 @@ public class TabletTest {
CompactionPlan plan = EasyMock.createMock(CompactionPlan.class);
WriteParameters writeParams = EasyMock.createMock(WriteParameters.class);
plan.writeParameters = writeParams;
- DatafileManager dfm = EasyMock.createMock(DatafileManager.class);
- TabletTime time = EasyMock.createMock(TabletTime.class);
- TabletServer tserver = EasyMock.createMock(TabletServer.class);
- TabletResourceManager tserverResourceManager = EasyMock.createMock(TabletResourceManager.class);
- TabletMemory tabletMemory = EasyMock.createMock(TabletMemory.class);
- KeyExtent extent = EasyMock.createMock(KeyExtent.class);
- ConfigurationObserver obs = EasyMock.createMock(ConfigurationObserver.class);
-
- Tablet tablet = new Tablet(time, "", 0, new Path("/foo"), dfm, tserver, tserverResourceManager,
- tabletMemory, tableConf, extent, obs);
long hdfsBlockSize = 10000L, blockSize = 5000L, indexBlockSize = 500L;
int replication = 5;
@@ -66,7 +50,7 @@ public class TabletTest {
EasyMock.replay(tableConf, plan, writeParams);
- AccumuloConfiguration aConf = tablet.createTableConfiguration(tableConf, plan);
+ AccumuloConfiguration aConf = Tablet.createCompactionConfiguration(tableConf, plan);
EasyMock.verify(tableConf, plan, writeParams);
@@ -77,5 +61,4 @@ public class TabletTest {
assertEquals(compressType, aConf.get(Property.TABLE_FILE_COMPRESSION_TYPE));
assertEquals(replication, Integer.parseInt(aConf.get(Property.TABLE_FILE_REPLICATION)));
}
-
}