You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/03/20 23:44:26 UTC
[1/6] git commit: ACCUMULO-2489 Fixes race condition in
TableConfiguration where NPE may occur.
Repository: accumulo
Updated Branches:
refs/heads/1.5.2-SNAPSHOT 63d5e55a0 -> 1d608a81f
refs/heads/1.6.0-SNAPSHOT 44b13c12e -> ef5dc4a1f
refs/heads/master 394fe061f -> 866422d27
ACCUMULO-2489 Fixes race condition in TableConfiguration where NPE may occur.
Make invalidateCache much more efficient by calling clear on ZooCache
instead of creating a new one.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1d608a81
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1d608a81
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1d608a81
Branch: refs/heads/1.5.2-SNAPSHOT
Commit: 1d608a81f488c2bf371fc81f83e0022bd2943a36
Parents: 63d5e55
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 20 18:08:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:08:55 2014 -0400
----------------------------------------------------------------------
.../server/conf/ServerConfiguration.java | 1 +
.../server/conf/TableConfiguration.java | 80 ++++++----
.../test/TableConfigurationUpdateTest.java | 152 +++++++++++++++++++
3 files changed, 200 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
index b2acd1a..8653274 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.ConfigSanityCheck;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.client.HdfsZooInstance;
public class ServerConfiguration {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 59ff1f7..7a3d6e4 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -37,37 +37,46 @@ import org.apache.log4j.Logger;
public class TableConfiguration extends AccumuloConfiguration {
private static final Logger log = Logger.getLogger(TableConfiguration.class);
-
+
// Need volatile keyword to ensure double-checked locking works as intended
private static volatile ZooCache tablePropCache = null;
+ private static final Object initLock = new Object();
+
private final String instanceId;
+ private final Instance instance;
private final AccumuloConfiguration parent;
-
+
private String table = null;
private Set<ConfigurationObserver> observers;
-
+
public TableConfiguration(String instanceId, String table, AccumuloConfiguration parent) {
+ this(instanceId, HdfsZooInstance.getInstance(), table, parent);
+ }
+
+ public TableConfiguration(String instanceId, Instance instance, String table, AccumuloConfiguration parent) {
this.instanceId = instanceId;
+ this.instance = instance;
this.table = table;
this.parent = parent;
-
+
this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
}
-
- /**
- * @deprecated not for client use
- */
- @Deprecated
- private static ZooCache getTablePropCache() {
- Instance inst = HdfsZooInstance.getInstance();
- if (tablePropCache == null)
- synchronized (TableConfiguration.class) {
- if (tablePropCache == null)
- tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
+
+ private void initializeZooCache() {
+ synchronized (initLock) {
+ if (null == tablePropCache) {
+ tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance));
}
+ }
+ }
+
+ private ZooCache getTablePropCache() {
+ if (null == tablePropCache) {
+ initializeZooCache();
+ }
return tablePropCache;
}
-
+
public void addObserver(ConfigurationObserver co) {
if (table == null) {
String err = "Attempt to add observer for non-table configuration";
@@ -77,7 +86,7 @@ public class TableConfiguration extends AccumuloConfiguration {
iterator();
observers.add(co);
}
-
+
public void removeObserver(ConfigurationObserver configObserver) {
if (table == null) {
String err = "Attempt to remove observer for non-table configuration";
@@ -86,29 +95,29 @@ public class TableConfiguration extends AccumuloConfiguration {
}
observers.remove(configObserver);
}
-
+
public void expireAllObservers() {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.sessionExpired();
}
-
+
public void propertyChanged(String key) {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertyChanged(key);
}
-
+
public void propertiesChanged(String key) {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertiesChanged();
}
-
+
public String get(Property property) {
String key = property.getKey();
String value = get(key);
-
+
if (value == null || !property.getType().isValidFormat(value)) {
if (value != null)
log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
@@ -116,23 +125,24 @@ public class TableConfiguration extends AccumuloConfiguration {
}
return value;
}
-
+
private String get(String key) {
String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+
byte[] v = getTablePropCache().get(zPath);
String value = null;
if (v != null)
value = new String(v, Constants.UTF8);
return value;
}
-
+
@Override
public Iterator<Entry<String,String>> iterator() {
TreeMap<String,String> entries = new TreeMap<String,String>();
-
+
for (Entry<String,String> parentEntry : parent)
entries.put(parentEntry.getKey(), parentEntry.getValue());
-
+
List<String> children = getTablePropCache().getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
if (children != null) {
for (String child : children) {
@@ -141,10 +151,10 @@ public class TableConfiguration extends AccumuloConfiguration {
entries.put(child, value);
}
}
-
+
return entries.entrySet().iterator();
}
-
+
public String getTableId() {
return table;
}
@@ -152,11 +162,15 @@ public class TableConfiguration extends AccumuloConfiguration {
@Override
public void invalidateCache() {
if (null != tablePropCache) {
- synchronized (TableConfiguration.class) {
- if (null != tablePropCache) {
- tablePropCache = null;
- }
- }
+ tablePropCache.clear();
}
+ // Else, if the cache is null, we could lock and double-check
+ // to see if it happened to be created so we could invalidate it
+ // but I don't see much benefit coming from that extra check.
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
new file mode 100644
index 0000000..30da268
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TableConfigurationUpdateTest {
+ private static final Logger log = Logger.getLogger(TableConfigurationUpdateTest.class);
+
+ public static TemporaryFolder folder = new TemporaryFolder();
+ private MiniAccumuloCluster accumulo;
+ private String secret = "secret";
+
+ @Before
+ public void setUp() throws Exception {
+ folder.create();
+ accumulo = new MiniAccumuloCluster(folder.getRoot(), secret);
+ accumulo.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ accumulo.stop();
+ folder.delete();
+ }
+
+ @Test
+ public void test() throws Exception {
+ ZooKeeperInstance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers(), 60 * 1000);
+ Connector conn = inst.getConnector("root", new PasswordToken(secret));
+
+ String table = "foo";
+ conn.tableOperations().create(table);
+
+ final DefaultConfiguration defaultConf = AccumuloConfiguration.getDefaultConfiguration();
+
+ // Cache invalidates 25% of the time
+ int randomMax = 4;
+ // Number of threads
+ int numThreads = 2;
+ // Number of iterations per thread
+ int iterations = 100000;
+ AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf);
+
+ long start = System.currentTimeMillis();
+ ExecutorService svc = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch countDown = new CountDownLatch(numThreads);
+ ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown)));
+ }
+
+ svc.shutdown();
+ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES));
+
+ for (Future<Exception> fut : futures) {
+ Exception e = fut.get();
+ if (null != e) {
+ Assert.fail("Thread failed with exception " + e);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates "
+ + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)");
+ }
+
+ public static class TableConfRunner implements Callable<Exception> {
+ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD;
+ private AccumuloConfiguration tableConf;
+ private CountDownLatch countDown;
+ private int iterations, randMax;
+
+ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) {
+ this.randMax = randMax;
+ this.iterations = iterations;
+ this.tableConf = tableConf;
+ this.countDown = countDown;
+ }
+
+ @Override
+ public Exception call() {
+ Random r = new Random();
+ countDown.countDown();
+ try {
+ countDown.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return e;
+ }
+
+ String t = Thread.currentThread().getName() + " ";
+ try {
+ for (int i = 0; i < iterations; i++) {
+ // if (i % 10000 == 0) {
+ // log.info(t + " " + i);
+ // }
+ int choice = r.nextInt(randMax);
+ if (choice < 1) {
+ tableConf.invalidateCache();
+ } else {
+ tableConf.get(prop);
+ }
+ }
+ } catch (Exception e) {
+ log.error(t, e);
+ return e;
+ }
+
+ return null;
+ }
+
+ }
+
+}
[4/6] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by el...@apache.org.
Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef5dc4a1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef5dc4a1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef5dc4a1
Branch: refs/heads/master
Commit: ef5dc4a1f6d67f643f80dd821280140fd20ee947
Parents: 44b13c1 1d608a8
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 20 18:43:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:43:55 2014 -0400
----------------------------------------------------------------------
.../server/conf/NamespaceConfiguration.java | 10 ++
.../server/conf/TableConfiguration.java | 39 +++--
.../test/TableConfigurationUpdateIT.java | 154 +++++++++++++++++++
3 files changed, 193 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
index c0ac0b8,0000000..d08d45f
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
@@@ -1,172 -1,0 +1,182 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.Logger;
+
+public class NamespaceConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(NamespaceConfiguration.class);
+
+ private final AccumuloConfiguration parent;
+ private static ZooCache propCache = null;
+ protected String namespaceId = null;
+ protected Instance inst = null;
+ private Set<ConfigurationObserver> observers;
+
+ public NamespaceConfiguration(String namespaceId, AccumuloConfiguration parent) {
+ inst = HdfsZooInstance.getInstance();
+ this.parent = parent;
+ this.namespaceId = namespaceId;
+ this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
+ }
+
+ @Override
+ public String get(Property property) {
+ String key = property.getKey();
+ String value = get(getPropCache(), key);
+
+ if (value == null || !property.getType().isValidFormat(value)) {
+ if (value != null)
+ log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+ if (!(namespaceId.equals(Namespaces.ACCUMULO_NAMESPACE_ID) && isIteratorOrConstraint(property.getKey()))) {
+ // ignore iterators from parent if system namespace
+ value = parent.get(property);
+ }
+ }
+ return value;
+ }
+
+ private String get(ZooCache zc, String key) {
+ String zPath = ZooUtil.getRoot(inst.getInstanceID()) + Constants.ZNAMESPACES + "/" + getNamespaceId() + Constants.ZNAMESPACE_CONF + "/" + key;
+ byte[] v = zc.get(zPath);
+ String value = null;
+ if (v != null)
+ value = new String(v, Constants.UTF8);
+ return value;
+ }
+
+ private synchronized static ZooCache getPropCache() {
+ Instance inst = HdfsZooInstance.getInstance();
+ if (propCache == null)
+ propCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new NamespaceConfWatcher(inst));
+ return propCache;
+ }
+
+ private class SystemNamespaceFilter implements PropertyFilter {
+
+ private PropertyFilter userFilter;
+
+ SystemNamespaceFilter(PropertyFilter userFilter) {
+ this.userFilter = userFilter;
+ }
+
+ @Override
+ public boolean accept(String key) {
+ if (isIteratorOrConstraint(key))
+ return false;
+ return userFilter.accept(key);
+ }
+
+ }
+
+ @Override
+ public void getProperties(Map<String,String> props, PropertyFilter filter) {
+
+ PropertyFilter parentFilter = filter;
+
+ // exclude system iterators/constraints from the system namespace
+ // so they don't affect the metadata or root tables.
+ if (getNamespaceId().equals(Namespaces.ACCUMULO_NAMESPACE_ID))
+ parentFilter = new SystemNamespaceFilter(filter);
+
+ parent.getProperties(props, parentFilter);
+
+ ZooCache zc = getPropCache();
+
+ List<String> children = zc.getChildren(ZooUtil.getRoot(inst.getInstanceID()) + Constants.ZNAMESPACES + "/" + getNamespaceId() + Constants.ZNAMESPACE_CONF);
+ if (children != null) {
+ for (String child : children) {
+ if (child != null && filter.accept(child)) {
+ String value = get(zc, child);
+ if (value != null)
+ props.put(child, value);
+ }
+ }
+ }
+ }
+
+ protected String getNamespaceId() {
+ return namespaceId;
+ }
+
+ public void addObserver(ConfigurationObserver co) {
+ if (namespaceId == null) {
+ String err = "Attempt to add observer for non-namespace configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ iterator();
+ observers.add(co);
+ }
+
+ public void removeObserver(ConfigurationObserver configObserver) {
+ if (namespaceId == null) {
+ String err = "Attempt to remove observer for non-namespace configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ observers.remove(configObserver);
+ }
+
+ public void expireAllObservers() {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.sessionExpired();
+ }
+
+ public void propertyChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertyChanged(key);
+ }
+
+ public void propertiesChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertiesChanged();
+ }
+
+ protected boolean isIteratorOrConstraint(String key) {
+ return key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey());
+ }
++
++ @Override
++ public void invalidateCache() {
++ if (null != propCache) {
++ propCache.clear();
++ }
++ // Else, if the cache is null, we could lock and double-check
++ // to see if it happened to be created so we could invalidate it
++ // but I don't see much benefit coming from that extra check.
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index fae4167,0000000..c134e31
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@@ -1,168 -1,0 +1,187 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.Logger;
+
+public class TableConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(TableConfiguration.class);
-
++
+ // Need volatile keyword to ensure double-checked locking works as intended
+ private static volatile ZooCache tablePropCache = null;
++ private static final Object initLock = new Object();
+
+ private final String instanceId;
++ private final Instance instance;
+ private final NamespaceConfiguration parent;
+
+ private String table = null;
+ private Set<ConfigurationObserver> observers;
+
+ public TableConfiguration(String instanceId, String table, NamespaceConfiguration parent) {
++ this(instanceId, HdfsZooInstance.getInstance(), table, parent);
++ }
++
++ public TableConfiguration(String instanceId, Instance instance, String table, NamespaceConfiguration parent) {
+ this.instanceId = instanceId;
++ this.instance = instance;
+ this.table = table;
+ this.parent = parent;
+
+ this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
+ }
+
- private synchronized static ZooCache getTablePropCache() {
- Instance inst = HdfsZooInstance.getInstance();
- if (tablePropCache == null)
- tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
++ private void initializeZooCache() {
++ synchronized (initLock) {
++ if (null == tablePropCache) {
++ tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance));
++ }
++ }
++ }
++
++ private ZooCache getTablePropCache() {
++ if (null == tablePropCache) {
++ initializeZooCache();
++ }
+ return tablePropCache;
+ }
+
+ public void addObserver(ConfigurationObserver co) {
+ if (table == null) {
+ String err = "Attempt to add observer for non-table configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ iterator();
+ observers.add(co);
+ }
+
+ public void removeObserver(ConfigurationObserver configObserver) {
+ if (table == null) {
+ String err = "Attempt to remove observer for non-table configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ observers.remove(configObserver);
+ }
+
+ public void expireAllObservers() {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.sessionExpired();
+ }
+
+ public void propertyChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertyChanged(key);
+ }
+
+ public void propertiesChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertiesChanged();
+ }
+
+ @Override
+ public String get(Property property) {
+ String key = property.getKey();
+ String value = get(getTablePropCache(), key);
+
+ if (value == null || !property.getType().isValidFormat(value)) {
+ if (value != null)
+ log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+ value = parent.get(property);
+ }
+ return value;
+ }
+
+ private String get(ZooCache zc, String key) {
+ String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+ byte[] v = zc.get(zPath);
+ String value = null;
+ if (v != null)
+ value = new String(v, Constants.UTF8);
+ return value;
+ }
+
+ @Override
+ public void getProperties(Map<String,String> props, PropertyFilter filter) {
+ parent.getProperties(props, filter);
+
+ ZooCache zc = getTablePropCache();
+
+ List<String> children = zc.getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
+ if (children != null) {
+ for (String child : children) {
+ if (child != null && filter.accept(child)) {
+ String value = get(zc, child);
+ if (value != null)
+ props.put(child, value);
+ }
+ }
+ }
+ }
+
+ public String getTableId() {
+ return table;
+ }
+
+ /**
+ * returns the actual NamespaceConfiguration that corresponds to the current parent namespace.
+ */
+ public NamespaceConfiguration getNamespaceConfiguration() {
+ return ServerConfiguration.getNamespaceConfiguration(parent.inst, parent.namespaceId);
+ }
+
+ /**
+ * returns the parent, which is actually a TableParentConfiguration that can change which namespace it references
+ */
+ public NamespaceConfiguration getParentConfiguration() {
+ return parent;
+ }
+
+ @Override
+ public void invalidateCache() {
+ if (null != tablePropCache) {
- synchronized (TableConfiguration.class) {
- if (null != tablePropCache) {
- tablePropCache = null;
- }
- }
++ tablePropCache.clear();
+ }
++ // Else, if the cache is null, we could lock and double-check
++ // to see if it happened to be created so we could invalidate it
++ // but I don't see much benefit coming from that extra check.
++ }
++
++ @Override
++ public String toString() {
++ return this.getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
index 0000000,0000000..c3e3342
new file mode 100644
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
@@@ -1,0 -1,0 +1,154 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test;
++
++import java.util.ArrayList;
++import java.util.Random;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.TimeUnit;
++
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.Instance;
++import org.apache.accumulo.core.client.ZooKeeperInstance;
++import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.conf.AccumuloConfiguration;
++import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.minicluster.MiniAccumuloCluster;
++import org.apache.accumulo.server.conf.NamespaceConfiguration;
++import org.apache.accumulo.server.conf.TableConfiguration;
++import org.apache.accumulo.server.conf.TableParentConfiguration;
++import org.apache.log4j.Logger;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.rules.TemporaryFolder;
++
++public class TableConfigurationUpdateIT {
++ private static final Logger log = Logger.getLogger(TableConfigurationUpdateIT.class);
++
++ public static TemporaryFolder folder = new TemporaryFolder();
++ private MiniAccumuloCluster accumulo;
++ private String secret = "secret";
++
++ @Before
++ public void setUp() throws Exception {
++ folder.create();
++ accumulo = new MiniAccumuloCluster(folder.getRoot(), secret);
++ accumulo.start();
++ }
++
++ @After
++ public void tearDown() throws Exception {
++ accumulo.stop();
++ folder.delete();
++ }
++
++ @Test
++ public void test() throws Exception {
++ Instance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
++ Connector conn = inst.getConnector("root", new PasswordToken(secret));
++
++ String table = "foo";
++ conn.tableOperations().create(table);
++
++ final NamespaceConfiguration defaultConf = new TableParentConfiguration(conn.tableOperations().tableIdMap().get(table), AccumuloConfiguration.getDefaultConfiguration());
++
++ // Cache invalidates 25% of the time
++ int randomMax = 4;
++ // Number of threads
++ int numThreads = 2;
++ // Number of iterations per thread
++ int iterations = 100000;
++ AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf);
++
++ long start = System.currentTimeMillis();
++ ExecutorService svc = Executors.newFixedThreadPool(numThreads);
++ CountDownLatch countDown = new CountDownLatch(numThreads);
++ ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads);
++
++ for (int i = 0; i < numThreads; i++) {
++ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown)));
++ }
++
++ svc.shutdown();
++ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES));
++
++ for (Future<Exception> fut : futures) {
++ Exception e = fut.get();
++ if (null != e) {
++ Assert.fail("Thread failed with exception " + e);
++ }
++ }
++
++ long end = System.currentTimeMillis();
++ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates "
++ + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)");
++ }
++
++ public static class TableConfRunner implements Callable<Exception> {
++ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD;
++ private AccumuloConfiguration tableConf;
++ private CountDownLatch countDown;
++ private int iterations, randMax;
++
++ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) {
++ this.randMax = randMax;
++ this.iterations = iterations;
++ this.tableConf = tableConf;
++ this.countDown = countDown;
++ }
++
++ @Override
++ public Exception call() {
++ Random r = new Random();
++ countDown.countDown();
++ try {
++ countDown.await();
++ } catch (InterruptedException e) {
++ Thread.currentThread().interrupt();
++ return e;
++ }
++
++ String t = Thread.currentThread().getName() + " ";
++ try {
++ for (int i = 0; i < iterations; i++) {
++ // if (i % 10000 == 0) {
++ // log.info(t + " " + i);
++ // }
++ int choice = r.nextInt(randMax);
++ if (choice < 1) {
++ tableConf.invalidateCache();
++ } else {
++ tableConf.get(prop);
++ }
++ }
++ } catch (Exception e) {
++ log.error(t, e);
++ return e;
++ }
++
++ return null;
++ }
++
++ }
++
++}
[3/6] git commit: ACCUMULO-2489 Fixes race condition in
TableConfiguration where NPE may occur.
Posted by el...@apache.org.
ACCUMULO-2489 Fixes race condition in TableConfiguration where NPE may occur.
Make invalidateCache much more efficient by calling clear on ZooCache
instead of creating a new one.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1d608a81
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1d608a81
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1d608a81
Branch: refs/heads/master
Commit: 1d608a81f488c2bf371fc81f83e0022bd2943a36
Parents: 63d5e55
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 20 18:08:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:08:55 2014 -0400
----------------------------------------------------------------------
.../server/conf/ServerConfiguration.java | 1 +
.../server/conf/TableConfiguration.java | 80 ++++++----
.../test/TableConfigurationUpdateTest.java | 152 +++++++++++++++++++
3 files changed, 200 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
index b2acd1a..8653274 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.ConfigSanityCheck;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.client.HdfsZooInstance;
public class ServerConfiguration {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 59ff1f7..7a3d6e4 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -37,37 +37,46 @@ import org.apache.log4j.Logger;
public class TableConfiguration extends AccumuloConfiguration {
private static final Logger log = Logger.getLogger(TableConfiguration.class);
-
+
// Need volatile keyword to ensure double-checked locking works as intended
private static volatile ZooCache tablePropCache = null;
+ private static final Object initLock = new Object();
+
private final String instanceId;
+ private final Instance instance;
private final AccumuloConfiguration parent;
-
+
private String table = null;
private Set<ConfigurationObserver> observers;
-
+
public TableConfiguration(String instanceId, String table, AccumuloConfiguration parent) {
+ this(instanceId, HdfsZooInstance.getInstance(), table, parent);
+ }
+
+ public TableConfiguration(String instanceId, Instance instance, String table, AccumuloConfiguration parent) {
this.instanceId = instanceId;
+ this.instance = instance;
this.table = table;
this.parent = parent;
-
+
this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
}
-
- /**
- * @deprecated not for client use
- */
- @Deprecated
- private static ZooCache getTablePropCache() {
- Instance inst = HdfsZooInstance.getInstance();
- if (tablePropCache == null)
- synchronized (TableConfiguration.class) {
- if (tablePropCache == null)
- tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
+
+ private void initializeZooCache() {
+ synchronized (initLock) {
+ if (null == tablePropCache) {
+ tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance));
}
+ }
+ }
+
+ private ZooCache getTablePropCache() {
+ if (null == tablePropCache) {
+ initializeZooCache();
+ }
return tablePropCache;
}
-
+
public void addObserver(ConfigurationObserver co) {
if (table == null) {
String err = "Attempt to add observer for non-table configuration";
@@ -77,7 +86,7 @@ public class TableConfiguration extends AccumuloConfiguration {
iterator();
observers.add(co);
}
-
+
public void removeObserver(ConfigurationObserver configObserver) {
if (table == null) {
String err = "Attempt to remove observer for non-table configuration";
@@ -86,29 +95,29 @@ public class TableConfiguration extends AccumuloConfiguration {
}
observers.remove(configObserver);
}
-
+
public void expireAllObservers() {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.sessionExpired();
}
-
+
public void propertyChanged(String key) {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertyChanged(key);
}
-
+
public void propertiesChanged(String key) {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertiesChanged();
}
-
+
public String get(Property property) {
String key = property.getKey();
String value = get(key);
-
+
if (value == null || !property.getType().isValidFormat(value)) {
if (value != null)
log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
@@ -116,23 +125,24 @@ public class TableConfiguration extends AccumuloConfiguration {
}
return value;
}
-
+
private String get(String key) {
String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+
byte[] v = getTablePropCache().get(zPath);
String value = null;
if (v != null)
value = new String(v, Constants.UTF8);
return value;
}
-
+
@Override
public Iterator<Entry<String,String>> iterator() {
TreeMap<String,String> entries = new TreeMap<String,String>();
-
+
for (Entry<String,String> parentEntry : parent)
entries.put(parentEntry.getKey(), parentEntry.getValue());
-
+
List<String> children = getTablePropCache().getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
if (children != null) {
for (String child : children) {
@@ -141,10 +151,10 @@ public class TableConfiguration extends AccumuloConfiguration {
entries.put(child, value);
}
}
-
+
return entries.entrySet().iterator();
}
-
+
public String getTableId() {
return table;
}
@@ -152,11 +162,15 @@ public class TableConfiguration extends AccumuloConfiguration {
@Override
public void invalidateCache() {
if (null != tablePropCache) {
- synchronized (TableConfiguration.class) {
- if (null != tablePropCache) {
- tablePropCache = null;
- }
- }
+ tablePropCache.clear();
}
+ // Else, if the cache is null, we could lock and double-check
+ // to see if it happened to be created so we could invalidate it
+ // but I don't see much benefit coming from that extra check.
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
new file mode 100644
index 0000000..30da268
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TableConfigurationUpdateTest {
+ private static final Logger log = Logger.getLogger(TableConfigurationUpdateTest.class);
+
+ public static TemporaryFolder folder = new TemporaryFolder();
+ private MiniAccumuloCluster accumulo;
+ private String secret = "secret";
+
+ @Before
+ public void setUp() throws Exception {
+ folder.create();
+ accumulo = new MiniAccumuloCluster(folder.getRoot(), secret);
+ accumulo.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ accumulo.stop();
+ folder.delete();
+ }
+
+ @Test
+ public void test() throws Exception {
+ ZooKeeperInstance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers(), 60 * 1000);
+ Connector conn = inst.getConnector("root", new PasswordToken(secret));
+
+ String table = "foo";
+ conn.tableOperations().create(table);
+
+ final DefaultConfiguration defaultConf = AccumuloConfiguration.getDefaultConfiguration();
+
+ // Cache invalidates 25% of the time
+ int randomMax = 4;
+ // Number of threads
+ int numThreads = 2;
+ // Number of iterations per thread
+ int iterations = 100000;
+ AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf);
+
+ long start = System.currentTimeMillis();
+ ExecutorService svc = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch countDown = new CountDownLatch(numThreads);
+ ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown)));
+ }
+
+ svc.shutdown();
+ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES));
+
+ for (Future<Exception> fut : futures) {
+ Exception e = fut.get();
+ if (null != e) {
+ Assert.fail("Thread failed with exception " + e);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates "
+ + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)");
+ }
+
+ public static class TableConfRunner implements Callable<Exception> {
+ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD;
+ private AccumuloConfiguration tableConf;
+ private CountDownLatch countDown;
+ private int iterations, randMax;
+
+ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) {
+ this.randMax = randMax;
+ this.iterations = iterations;
+ this.tableConf = tableConf;
+ this.countDown = countDown;
+ }
+
+ @Override
+ public Exception call() {
+ Random r = new Random();
+ countDown.countDown();
+ try {
+ countDown.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return e;
+ }
+
+ String t = Thread.currentThread().getName() + " ";
+ try {
+ for (int i = 0; i < iterations; i++) {
+ // if (i % 10000 == 0) {
+ // log.info(t + " " + i);
+ // }
+ int choice = r.nextInt(randMax);
+ if (choice < 1) {
+ tableConf.invalidateCache();
+ } else {
+ tableConf.get(prop);
+ }
+ }
+ } catch (Exception e) {
+ log.error(t, e);
+ return e;
+ }
+
+ return null;
+ }
+
+ }
+
+}
[6/6] git commit: Merge branch '1.6.0-SNAPSHOT'
Posted by el...@apache.org.
Merge branch '1.6.0-SNAPSHOT'
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/866422d2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/866422d2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/866422d2
Branch: refs/heads/master
Commit: 866422d277ce80f29119af80ea65b23cff08fc3a
Parents: 394fe06 ef5dc4a
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 20 18:44:06 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:44:06 2014 -0400
----------------------------------------------------------------------
.../server/conf/NamespaceConfiguration.java | 10 ++
.../server/conf/TableConfiguration.java | 39 +++--
.../test/TableConfigurationUpdateIT.java | 154 +++++++++++++++++++
3 files changed, 193 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
[2/6] git commit: ACCUMULO-2489 Fixes race condition in
TableConfiguration where NPE may occur.
Posted by el...@apache.org.
ACCUMULO-2489 Fixes race condition in TableConfiguration where NPE may occur.
Make invalidateCache much more efficient by calling clear on ZooCache
instead of creating a new one.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/1d608a81
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/1d608a81
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/1d608a81
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: 1d608a81f488c2bf371fc81f83e0022bd2943a36
Parents: 63d5e55
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 20 18:08:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:08:55 2014 -0400
----------------------------------------------------------------------
.../server/conf/ServerConfiguration.java | 1 +
.../server/conf/TableConfiguration.java | 80 ++++++----
.../test/TableConfigurationUpdateTest.java | 152 +++++++++++++++++++
3 files changed, 200 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
index b2acd1a..8653274 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/ServerConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.conf.ConfigSanityCheck;
import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.server.client.HdfsZooInstance;
public class ServerConfiguration {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 59ff1f7..7a3d6e4 100644
--- a/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -37,37 +37,46 @@ import org.apache.log4j.Logger;
public class TableConfiguration extends AccumuloConfiguration {
private static final Logger log = Logger.getLogger(TableConfiguration.class);
-
+
// Need volatile keyword to ensure double-checked locking works as intended
private static volatile ZooCache tablePropCache = null;
+ private static final Object initLock = new Object();
+
private final String instanceId;
+ private final Instance instance;
private final AccumuloConfiguration parent;
-
+
private String table = null;
private Set<ConfigurationObserver> observers;
-
+
public TableConfiguration(String instanceId, String table, AccumuloConfiguration parent) {
+ this(instanceId, HdfsZooInstance.getInstance(), table, parent);
+ }
+
+ public TableConfiguration(String instanceId, Instance instance, String table, AccumuloConfiguration parent) {
this.instanceId = instanceId;
+ this.instance = instance;
this.table = table;
this.parent = parent;
-
+
this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
}
-
- /**
- * @deprecated not for client use
- */
- @Deprecated
- private static ZooCache getTablePropCache() {
- Instance inst = HdfsZooInstance.getInstance();
- if (tablePropCache == null)
- synchronized (TableConfiguration.class) {
- if (tablePropCache == null)
- tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
+
+ private void initializeZooCache() {
+ synchronized (initLock) {
+ if (null == tablePropCache) {
+ tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance));
}
+ }
+ }
+
+ private ZooCache getTablePropCache() {
+ if (null == tablePropCache) {
+ initializeZooCache();
+ }
return tablePropCache;
}
-
+
public void addObserver(ConfigurationObserver co) {
if (table == null) {
String err = "Attempt to add observer for non-table configuration";
@@ -77,7 +86,7 @@ public class TableConfiguration extends AccumuloConfiguration {
iterator();
observers.add(co);
}
-
+
public void removeObserver(ConfigurationObserver configObserver) {
if (table == null) {
String err = "Attempt to remove observer for non-table configuration";
@@ -86,29 +95,29 @@ public class TableConfiguration extends AccumuloConfiguration {
}
observers.remove(configObserver);
}
-
+
public void expireAllObservers() {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.sessionExpired();
}
-
+
public void propertyChanged(String key) {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertyChanged(key);
}
-
+
public void propertiesChanged(String key) {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertiesChanged();
}
-
+
public String get(Property property) {
String key = property.getKey();
String value = get(key);
-
+
if (value == null || !property.getType().isValidFormat(value)) {
if (value != null)
log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
@@ -116,23 +125,24 @@ public class TableConfiguration extends AccumuloConfiguration {
}
return value;
}
-
+
private String get(String key) {
String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+
byte[] v = getTablePropCache().get(zPath);
String value = null;
if (v != null)
value = new String(v, Constants.UTF8);
return value;
}
-
+
@Override
public Iterator<Entry<String,String>> iterator() {
TreeMap<String,String> entries = new TreeMap<String,String>();
-
+
for (Entry<String,String> parentEntry : parent)
entries.put(parentEntry.getKey(), parentEntry.getValue());
-
+
List<String> children = getTablePropCache().getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
if (children != null) {
for (String child : children) {
@@ -141,10 +151,10 @@ public class TableConfiguration extends AccumuloConfiguration {
entries.put(child, value);
}
}
-
+
return entries.entrySet().iterator();
}
-
+
public String getTableId() {
return table;
}
@@ -152,11 +162,15 @@ public class TableConfiguration extends AccumuloConfiguration {
@Override
public void invalidateCache() {
if (null != tablePropCache) {
- synchronized (TableConfiguration.class) {
- if (null != tablePropCache) {
- tablePropCache = null;
- }
- }
+ tablePropCache.clear();
}
+ // Else, if the cache is null, we could lock and double-check
+ // to see if it happened to be created so we could invalidate it
+ // but I don't see much benefit coming from that extra check.
+ }
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/1d608a81/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
new file mode 100644
index 0000000..30da268
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import java.util.ArrayList;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.accumulo.server.conf.TableConfiguration;
+import org.apache.log4j.Logger;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TableConfigurationUpdateTest {
+ private static final Logger log = Logger.getLogger(TableConfigurationUpdateTest.class);
+
+ public static TemporaryFolder folder = new TemporaryFolder();
+ private MiniAccumuloCluster accumulo;
+ private String secret = "secret";
+
+ @Before
+ public void setUp() throws Exception {
+ folder.create();
+ accumulo = new MiniAccumuloCluster(folder.getRoot(), secret);
+ accumulo.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ accumulo.stop();
+ folder.delete();
+ }
+
+ @Test
+ public void test() throws Exception {
+ ZooKeeperInstance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers(), 60 * 1000);
+ Connector conn = inst.getConnector("root", new PasswordToken(secret));
+
+ String table = "foo";
+ conn.tableOperations().create(table);
+
+ final DefaultConfiguration defaultConf = AccumuloConfiguration.getDefaultConfiguration();
+
+ // Cache invalidates 25% of the time
+ int randomMax = 4;
+ // Number of threads
+ int numThreads = 2;
+ // Number of iterations per thread
+ int iterations = 100000;
+ AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf);
+
+ long start = System.currentTimeMillis();
+ ExecutorService svc = Executors.newFixedThreadPool(numThreads);
+ CountDownLatch countDown = new CountDownLatch(numThreads);
+ ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads);
+
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown)));
+ }
+
+ svc.shutdown();
+ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES));
+
+ for (Future<Exception> fut : futures) {
+ Exception e = fut.get();
+ if (null != e) {
+ Assert.fail("Thread failed with exception " + e);
+ }
+ }
+
+ long end = System.currentTimeMillis();
+ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates "
+ + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)");
+ }
+
+ public static class TableConfRunner implements Callable<Exception> {
+ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD;
+ private AccumuloConfiguration tableConf;
+ private CountDownLatch countDown;
+ private int iterations, randMax;
+
+ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) {
+ this.randMax = randMax;
+ this.iterations = iterations;
+ this.tableConf = tableConf;
+ this.countDown = countDown;
+ }
+
+ @Override
+ public Exception call() {
+ Random r = new Random();
+ countDown.countDown();
+ try {
+ countDown.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return e;
+ }
+
+ String t = Thread.currentThread().getName() + " ";
+ try {
+ for (int i = 0; i < iterations; i++) {
+ // if (i % 10000 == 0) {
+ // log.info(t + " " + i);
+ // }
+ int choice = r.nextInt(randMax);
+ if (choice < 1) {
+ tableConf.invalidateCache();
+ } else {
+ tableConf.get(prop);
+ }
+ }
+ } catch (Exception e) {
+ log.error(t, e);
+ return e;
+ }
+
+ return null;
+ }
+
+ }
+
+}
[5/6] git commit: Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Posted by el...@apache.org.
Merge branch '1.5.2-SNAPSHOT' into 1.6.0-SNAPSHOT
Conflicts:
server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/ef5dc4a1
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/ef5dc4a1
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/ef5dc4a1
Branch: refs/heads/1.6.0-SNAPSHOT
Commit: ef5dc4a1f6d67f643f80dd821280140fd20ee947
Parents: 44b13c1 1d608a8
Author: Josh Elser <el...@apache.org>
Authored: Thu Mar 20 18:43:55 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu Mar 20 18:43:55 2014 -0400
----------------------------------------------------------------------
.../server/conf/NamespaceConfiguration.java | 10 ++
.../server/conf/TableConfiguration.java | 39 +++--
.../test/TableConfigurationUpdateIT.java | 154 +++++++++++++++++++
3 files changed, 193 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
index c0ac0b8,0000000..d08d45f
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
@@@ -1,172 -1,0 +1,182 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.impl.Namespaces;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.Logger;
+
+public class NamespaceConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(NamespaceConfiguration.class);
+
+ private final AccumuloConfiguration parent;
+ private static ZooCache propCache = null;
+ protected String namespaceId = null;
+ protected Instance inst = null;
+ private Set<ConfigurationObserver> observers;
+
+ public NamespaceConfiguration(String namespaceId, AccumuloConfiguration parent) {
+ inst = HdfsZooInstance.getInstance();
+ this.parent = parent;
+ this.namespaceId = namespaceId;
+ this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
+ }
+
+ @Override
+ public String get(Property property) {
+ String key = property.getKey();
+ String value = get(getPropCache(), key);
+
+ if (value == null || !property.getType().isValidFormat(value)) {
+ if (value != null)
+ log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+ if (!(namespaceId.equals(Namespaces.ACCUMULO_NAMESPACE_ID) && isIteratorOrConstraint(property.getKey()))) {
+ // ignore iterators from parent if system namespace
+ value = parent.get(property);
+ }
+ }
+ return value;
+ }
+
+ private String get(ZooCache zc, String key) {
+ String zPath = ZooUtil.getRoot(inst.getInstanceID()) + Constants.ZNAMESPACES + "/" + getNamespaceId() + Constants.ZNAMESPACE_CONF + "/" + key;
+ byte[] v = zc.get(zPath);
+ String value = null;
+ if (v != null)
+ value = new String(v, Constants.UTF8);
+ return value;
+ }
+
+ private synchronized static ZooCache getPropCache() {
+ Instance inst = HdfsZooInstance.getInstance();
+ if (propCache == null)
+ propCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new NamespaceConfWatcher(inst));
+ return propCache;
+ }
+
+ private class SystemNamespaceFilter implements PropertyFilter {
+
+ private PropertyFilter userFilter;
+
+ SystemNamespaceFilter(PropertyFilter userFilter) {
+ this.userFilter = userFilter;
+ }
+
+ @Override
+ public boolean accept(String key) {
+ if (isIteratorOrConstraint(key))
+ return false;
+ return userFilter.accept(key);
+ }
+
+ }
+
+ @Override
+ public void getProperties(Map<String,String> props, PropertyFilter filter) {
+
+ PropertyFilter parentFilter = filter;
+
+ // exclude system iterators/constraints from the system namespace
+ // so they don't affect the metadata or root tables.
+ if (getNamespaceId().equals(Namespaces.ACCUMULO_NAMESPACE_ID))
+ parentFilter = new SystemNamespaceFilter(filter);
+
+ parent.getProperties(props, parentFilter);
+
+ ZooCache zc = getPropCache();
+
+ List<String> children = zc.getChildren(ZooUtil.getRoot(inst.getInstanceID()) + Constants.ZNAMESPACES + "/" + getNamespaceId() + Constants.ZNAMESPACE_CONF);
+ if (children != null) {
+ for (String child : children) {
+ if (child != null && filter.accept(child)) {
+ String value = get(zc, child);
+ if (value != null)
+ props.put(child, value);
+ }
+ }
+ }
+ }
+
+ protected String getNamespaceId() {
+ return namespaceId;
+ }
+
+ public void addObserver(ConfigurationObserver co) {
+ if (namespaceId == null) {
+ String err = "Attempt to add observer for non-namespace configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ iterator();
+ observers.add(co);
+ }
+
+ public void removeObserver(ConfigurationObserver configObserver) {
+ if (namespaceId == null) {
+ String err = "Attempt to remove observer for non-namespace configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ observers.remove(configObserver);
+ }
+
+ public void expireAllObservers() {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.sessionExpired();
+ }
+
+ public void propertyChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertyChanged(key);
+ }
+
+ public void propertiesChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertiesChanged();
+ }
+
+ protected boolean isIteratorOrConstraint(String key) {
+ return key.startsWith(Property.TABLE_ITERATOR_PREFIX.getKey()) || key.startsWith(Property.TABLE_CONSTRAINT_PREFIX.getKey());
+ }
++
++ @Override
++ public void invalidateCache() {
++ if (null != propCache) {
++ propCache.clear();
++ }
++ // Else, if the cache is null, we could lock and double-check
++ // to see if it happened to be created so we could invalidate it
++ // but I don't see much benefit coming from that extra check.
++ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --cc server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index fae4167,0000000..c134e31
mode 100644,000000..100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@@ -1,168 -1,0 +1,187 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.conf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooCache;
+import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.log4j.Logger;
+
+public class TableConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(TableConfiguration.class);
-
++
+ // Need volatile keyword to ensure double-checked locking works as intended
+ private static volatile ZooCache tablePropCache = null;
++ private static final Object initLock = new Object();
+
+ private final String instanceId;
++ private final Instance instance;
+ private final NamespaceConfiguration parent;
+
+ private String table = null;
+ private Set<ConfigurationObserver> observers;
+
+ public TableConfiguration(String instanceId, String table, NamespaceConfiguration parent) {
++ this(instanceId, HdfsZooInstance.getInstance(), table, parent);
++ }
++
++ public TableConfiguration(String instanceId, Instance instance, String table, NamespaceConfiguration parent) {
+ this.instanceId = instanceId;
++ this.instance = instance;
+ this.table = table;
+ this.parent = parent;
+
+ this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
+ }
+
- private synchronized static ZooCache getTablePropCache() {
- Instance inst = HdfsZooInstance.getInstance();
- if (tablePropCache == null)
- tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
++ private void initializeZooCache() {
++ synchronized (initLock) {
++ if (null == tablePropCache) {
++ tablePropCache = new ZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), new TableConfWatcher(instance));
++ }
++ }
++ }
++
++ private ZooCache getTablePropCache() {
++ if (null == tablePropCache) {
++ initializeZooCache();
++ }
+ return tablePropCache;
+ }
+
+ public void addObserver(ConfigurationObserver co) {
+ if (table == null) {
+ String err = "Attempt to add observer for non-table configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ iterator();
+ observers.add(co);
+ }
+
+ public void removeObserver(ConfigurationObserver configObserver) {
+ if (table == null) {
+ String err = "Attempt to remove observer for non-table configuration";
+ log.error(err);
+ throw new RuntimeException(err);
+ }
+ observers.remove(configObserver);
+ }
+
+ public void expireAllObservers() {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.sessionExpired();
+ }
+
+ public void propertyChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertyChanged(key);
+ }
+
+ public void propertiesChanged(String key) {
+ Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertiesChanged();
+ }
+
+ @Override
+ public String get(Property property) {
+ String key = property.getKey();
+ String value = get(getTablePropCache(), key);
+
+ if (value == null || !property.getType().isValidFormat(value)) {
+ if (value != null)
+ log.error("Using default value for " + key + " due to improperly formatted " + property.getType() + ": " + value);
+ value = parent.get(property);
+ }
+ return value;
+ }
+
+ private String get(ZooCache zc, String key) {
+ String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
+ byte[] v = zc.get(zPath);
+ String value = null;
+ if (v != null)
+ value = new String(v, Constants.UTF8);
+ return value;
+ }
+
+ @Override
+ public void getProperties(Map<String,String> props, PropertyFilter filter) {
+ parent.getProperties(props, filter);
+
+ ZooCache zc = getTablePropCache();
+
+ List<String> children = zc.getChildren(ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF);
+ if (children != null) {
+ for (String child : children) {
+ if (child != null && filter.accept(child)) {
+ String value = get(zc, child);
+ if (value != null)
+ props.put(child, value);
+ }
+ }
+ }
+ }
+
+ public String getTableId() {
+ return table;
+ }
+
+ /**
+ * returns the actual NamespaceConfiguration that corresponds to the current parent namespace.
+ */
+ public NamespaceConfiguration getNamespaceConfiguration() {
+ return ServerConfiguration.getNamespaceConfiguration(parent.inst, parent.namespaceId);
+ }
+
+ /**
+ * returns the parent, which is actually a TableParentConfiguration that can change which namespace it references
+ */
+ public NamespaceConfiguration getParentConfiguration() {
+ return parent;
+ }
+
+ @Override
+ public void invalidateCache() {
+ if (null != tablePropCache) {
- synchronized (TableConfiguration.class) {
- if (null != tablePropCache) {
- tablePropCache = null;
- }
- }
++ tablePropCache.clear();
+ }
++ // Else, if the cache is null, we could lock and double-check
++ // to see if it happened to be created so we could invalidate it
++ // but I don't see much benefit coming from that extra check.
++ }
++
++ @Override
++ public String toString() {
++ return this.getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/ef5dc4a1/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
index 0000000,0000000..c3e3342
new file mode 100644
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/TableConfigurationUpdateIT.java
@@@ -1,0 -1,0 +1,154 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one or more
++ * contributor license agreements. See the NOTICE file distributed with
++ * this work for additional information regarding copyright ownership.
++ * The ASF licenses this file to You under the Apache License, Version 2.0
++ * (the "License"); you may not use this file except in compliance with
++ * the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.accumulo.test;
++
++import java.util.ArrayList;
++import java.util.Random;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.TimeUnit;
++
++import org.apache.accumulo.core.client.Connector;
++import org.apache.accumulo.core.client.Instance;
++import org.apache.accumulo.core.client.ZooKeeperInstance;
++import org.apache.accumulo.core.client.security.tokens.PasswordToken;
++import org.apache.accumulo.core.conf.AccumuloConfiguration;
++import org.apache.accumulo.core.conf.Property;
++import org.apache.accumulo.minicluster.MiniAccumuloCluster;
++import org.apache.accumulo.server.conf.NamespaceConfiguration;
++import org.apache.accumulo.server.conf.TableConfiguration;
++import org.apache.accumulo.server.conf.TableParentConfiguration;
++import org.apache.log4j.Logger;
++import org.junit.After;
++import org.junit.Assert;
++import org.junit.Before;
++import org.junit.Test;
++import org.junit.rules.TemporaryFolder;
++
++public class TableConfigurationUpdateIT {
++ private static final Logger log = Logger.getLogger(TableConfigurationUpdateIT.class);
++
++ public static TemporaryFolder folder = new TemporaryFolder();
++ private MiniAccumuloCluster accumulo;
++ private String secret = "secret";
++
++ @Before
++ public void setUp() throws Exception {
++ folder.create();
++ accumulo = new MiniAccumuloCluster(folder.getRoot(), secret);
++ accumulo.start();
++ }
++
++ @After
++ public void tearDown() throws Exception {
++ accumulo.stop();
++ folder.delete();
++ }
++
++ @Test
++ public void test() throws Exception {
++ Instance inst = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
++ Connector conn = inst.getConnector("root", new PasswordToken(secret));
++
++ String table = "foo";
++ conn.tableOperations().create(table);
++
++ final NamespaceConfiguration defaultConf = new TableParentConfiguration(conn.tableOperations().tableIdMap().get(table), AccumuloConfiguration.getDefaultConfiguration());
++
++ // Cache invalidates 25% of the time
++ int randomMax = 4;
++ // Number of threads
++ int numThreads = 2;
++ // Number of iterations per thread
++ int iterations = 100000;
++ AccumuloConfiguration tableConf = new TableConfiguration(inst.getInstanceID(), inst, table, defaultConf);
++
++ long start = System.currentTimeMillis();
++ ExecutorService svc = Executors.newFixedThreadPool(numThreads);
++ CountDownLatch countDown = new CountDownLatch(numThreads);
++ ArrayList<Future<Exception>> futures = new ArrayList<Future<Exception>>(numThreads);
++
++ for (int i = 0; i < numThreads; i++) {
++ futures.add(svc.submit(new TableConfRunner(randomMax, iterations, tableConf, countDown)));
++ }
++
++ svc.shutdown();
++ Assert.assertTrue(svc.awaitTermination(60, TimeUnit.MINUTES));
++
++ for (Future<Exception> fut : futures) {
++ Exception e = fut.get();
++ if (null != e) {
++ Assert.fail("Thread failed with exception " + e);
++ }
++ }
++
++ long end = System.currentTimeMillis();
++ log.debug(tableConf + " with " + iterations + " iterations and " + numThreads + " threads and cache invalidates "
++ + ((1. / randomMax) * 100.) + "% took " + (end - start) / 1000 + " second(s)");
++ }
++
++ public static class TableConfRunner implements Callable<Exception> {
++ private static final Property prop = Property.TABLE_SPLIT_THRESHOLD;
++ private AccumuloConfiguration tableConf;
++ private CountDownLatch countDown;
++ private int iterations, randMax;
++
++ public TableConfRunner(int randMax, int iterations, AccumuloConfiguration tableConf, CountDownLatch countDown) {
++ this.randMax = randMax;
++ this.iterations = iterations;
++ this.tableConf = tableConf;
++ this.countDown = countDown;
++ }
++
++ @Override
++ public Exception call() {
++ Random r = new Random();
++ countDown.countDown();
++ try {
++ countDown.await();
++ } catch (InterruptedException e) {
++ Thread.currentThread().interrupt();
++ return e;
++ }
++
++ String t = Thread.currentThread().getName() + " ";
++ try {
++ for (int i = 0; i < iterations; i++) {
++ // if (i % 10000 == 0) {
++ // log.info(t + " " + i);
++ // }
++ int choice = r.nextInt(randMax);
++ if (choice < 1) {
++ tableConf.invalidateCache();
++ } else {
++ tableConf.get(prop);
++ }
++ }
++ } catch (Exception e) {
++ log.error(t, e);
++ return e;
++ }
++
++ return null;
++ }
++
++ }
++
++}