You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by bh...@apache.org on 2014/06/11 17:21:02 UTC
[2/3] git commit: ACCUMULO-2887 Add ObservableConfiguration
ACCUMULO-2887 Add ObservableConfiguration
The logic for maintaining and notifying observers in NamespaceConfiguration and
TableConfiguration is refactored into a new ObservableConfiguration class.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d50bb3a2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d50bb3a2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d50bb3a2
Branch: refs/heads/master
Commit: d50bb3a219859d276acc08a126b074e42369b867
Parents: 45b7e29
Author: Bill Havanki <bh...@cloudera.com>
Authored: Fri Apr 4 12:19:28 2014 -0400
Committer: Bill Havanki <bh...@cloudera.com>
Committed: Wed Jun 11 10:21:32 2014 -0400
----------------------------------------------------------------------
.../core/conf/ObservableConfiguration.java | 109 +++++++++++++++++++
.../core/conf/ObservableConfigurationTest.java | 96 ++++++++++++++++
.../server/conf/NamespaceConfWatcher.java | 2 +-
.../server/conf/NamespaceConfiguration.java | 35 ++----
.../accumulo/server/conf/TableConfWatcher.java | 2 +-
.../server/conf/TableConfiguration.java | 36 ++----
6 files changed, 221 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java b/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java
new file mode 100644
index 0000000..7e8cc3a
--- /dev/null
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ObservableConfiguration.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.conf;
+
+import com.google.common.base.Preconditions;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import org.apache.log4j.Logger;
+
+/**
+ * A configuration that can be observed. Handling of observers is thread-safe.
+ */
+public abstract class ObservableConfiguration extends AccumuloConfiguration {
+ private static final Logger log = Logger.getLogger(ObservableConfiguration.class);
+
+ private Set<ConfigurationObserver> observers;
+
+ /**
+ * Creates a new observable configuration.
+ */
+ public ObservableConfiguration() {
+ observers = Collections.synchronizedSet(new java.util.HashSet<ConfigurationObserver>());
+ }
+
+ /**
+ * Adds an observer.
+ *
+ * @param co
+ * observer
+ * @throws NullPointerException
+ * if co is null
+ */
+ public void addObserver(ConfigurationObserver co) {
+ Preconditions.checkNotNull(co);
+ observers.add(co);
+ }
+
+ /**
+ * Removes an observer.
+ *
+ * @param co
+ * observer
+ */
+ public void removeObserver(ConfigurationObserver co) {
+ observers.remove(co);
+ }
+
+ /**
+ * Gets the current set of observers. The returned collection is a snapshot, and changes to it do not reflect back to the configuration.
+ *
+ * @return observers
+ */
+ public Collection<ConfigurationObserver> getObservers() {
+ return snapshot(observers);
+ }
+
+ private static Collection<ConfigurationObserver> snapshot(Collection<ConfigurationObserver> observers) {
+ Collection<ConfigurationObserver> c = new java.util.ArrayList<ConfigurationObserver>();
+ synchronized (observers) {
+ c.addAll(observers);
+ }
+ return c;
+ }
+
+ /**
+ * Expires all observers.
+ */
+ public void expireAllObservers() {
+ Collection<ConfigurationObserver> copy = snapshot(observers);
+ for (ConfigurationObserver co : copy)
+ co.sessionExpired();
+ }
+
+ /**
+ * Notifies all observers that a property changed.
+ *
+ * @param key
+ * configuration property key
+ */
+ public void propertyChanged(String key) {
+ Collection<ConfigurationObserver> copy = snapshot(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertyChanged(key);
+ }
+
+ /**
+ * Notifies all observers that properties changed.
+ */
+ public void propertiesChanged() {
+ Collection<ConfigurationObserver> copy = snapshot(observers);
+ for (ConfigurationObserver co : copy)
+ co.propertiesChanged();
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java b/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java
new file mode 100644
index 0000000..3534b6c
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/conf/ObservableConfigurationTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.conf;
+
+import java.util.Collection;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class ObservableConfigurationTest {
+ private static class TestObservableConfig extends ObservableConfiguration {
+ @Override
+ public String get(Property property) {
+ return null;
+ }
+
+ @Override
+ public void getProperties(Map<String,String> props, PropertyFilter filter) {}
+ }
+
+ private ObservableConfiguration c;
+ private ConfigurationObserver co1;
+
+ @Before
+ public void setUp() {
+ c = new TestObservableConfig();
+ co1 = createMock(ConfigurationObserver.class);
+ }
+
+ @Test
+ public void testAddAndRemove() {
+ ConfigurationObserver co2 = createMock(ConfigurationObserver.class);
+ c.addObserver(co1);
+ c.addObserver(co2);
+ Collection<ConfigurationObserver> cos = c.getObservers();
+ assertEquals(2, cos.size());
+ assertTrue(cos.contains(co1));
+ assertTrue(cos.contains(co2));
+ c.removeObserver(co1);
+ cos = c.getObservers();
+ assertEquals(1, cos.size());
+ assertTrue(cos.contains(co2));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testNoNullAdd() {
+ c.addObserver(null);
+ }
+
+ @Test
+ public void testSessionExpired() {
+ c.addObserver(co1);
+ co1.sessionExpired();
+ replay(co1);
+ c.expireAllObservers();
+ verify(co1);
+ }
+
+ @Test
+ public void testPropertyChanged() {
+ String key = "key";
+ c.addObserver(co1);
+ co1.propertyChanged(key);
+ replay(co1);
+ c.propertyChanged(key);
+ verify(co1);
+ }
+
+ @Test
+ public void testPropertiesChanged() {
+ c.addObserver(co1);
+ co1.propertiesChanged();
+ replay(co1);
+ c.propertiesChanged();
+ verify(co1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
index 7f3d73e..50ab27e 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfWatcher.java
@@ -72,7 +72,7 @@ class NamespaceConfWatcher implements Watcher {
ServerConfiguration.getNamespaceConfiguration(instance, namespaceId).propertyChanged(key);
break;
case NodeChildrenChanged:
- ServerConfiguration.getNamespaceConfiguration(instance, namespaceId).propertiesChanged(key);
+ ServerConfiguration.getNamespaceConfiguration(instance, namespaceId).propertiesChanged();
break;
case NodeDeleted:
if (key == null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
index eab198e..ebb098b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/NamespaceConfiguration.java
@@ -16,18 +16,15 @@
*/
package org.apache.accumulo.server.conf;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.Namespaces;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.ObservableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -35,7 +32,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.log4j.Logger;
-public class NamespaceConfiguration extends AccumuloConfiguration {
+public class NamespaceConfiguration extends ObservableConfiguration {
private static final Logger log = Logger.getLogger(NamespaceConfiguration.class);
private final AccumuloConfiguration parent;
@@ -43,7 +40,6 @@ public class NamespaceConfiguration extends AccumuloConfiguration {
private static final Object lock = new Object();
protected String namespaceId = null;
protected Instance inst = null;
- private Set<ConfigurationObserver> observers;
public NamespaceConfiguration(String namespaceId, AccumuloConfiguration parent) {
this(namespaceId, HdfsZooInstance.getInstance(), parent);
@@ -53,7 +49,6 @@ public class NamespaceConfiguration extends AccumuloConfiguration {
this.inst = inst;
this.parent = parent;
this.namespaceId = namespaceId;
- this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
}
@Override
@@ -142,6 +137,7 @@ public class NamespaceConfiguration extends AccumuloConfiguration {
return namespaceId;
}
+ @Override
public void addObserver(ConfigurationObserver co) {
if (namespaceId == null) {
String err = "Attempt to add observer for non-namespace configuration";
@@ -149,34 +145,17 @@ public class NamespaceConfiguration extends AccumuloConfiguration {
throw new RuntimeException(err);
}
iterator();
- observers.add(co);
+ super.addObserver(co);
}
- public void removeObserver(ConfigurationObserver configObserver) {
+ @Override
+ public void removeObserver(ConfigurationObserver co) {
if (namespaceId == null) {
String err = "Attempt to remove observer for non-namespace configuration";
log.error(err);
throw new RuntimeException(err);
}
- observers.remove(configObserver);
- }
-
- public void expireAllObservers() {
- Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
- for (ConfigurationObserver co : copy)
- co.sessionExpired();
- }
-
- public void propertyChanged(String key) {
- Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
- for (ConfigurationObserver co : copy)
- co.propertyChanged(key);
- }
-
- public void propertiesChanged(String key) {
- Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
- for (ConfigurationObserver co : copy)
- co.propertiesChanged();
+ super.removeObserver(co);
}
protected boolean isIteratorOrConstraint(String key) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
index c407309..3c24ec4 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
@@ -72,7 +72,7 @@ class TableConfWatcher implements Watcher {
ServerConfiguration.getTableConfiguration(instance, tableId).propertyChanged(key);
break;
case NodeChildrenChanged:
- ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(key);
+ ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged();
break;
case NodeDeleted:
if (key == null) {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/d50bb3a2/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
index 909b450..a9c9411 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
@@ -16,17 +16,14 @@
*/
package org.apache.accumulo.server.conf;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationObserver;
+import org.apache.accumulo.core.conf.ObservableConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -34,7 +31,7 @@ import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
import org.apache.accumulo.server.client.HdfsZooInstance;
import org.apache.log4j.Logger;
-public class TableConfiguration extends AccumuloConfiguration {
+public class TableConfiguration extends ObservableConfiguration {
private static final Logger log = Logger.getLogger(TableConfiguration.class);
// Need volatile keyword to ensure double-checked locking works as intended
@@ -46,7 +43,6 @@ public class TableConfiguration extends AccumuloConfiguration {
private final NamespaceConfiguration parent;
private String table = null;
- private Set<ConfigurationObserver> observers;
public TableConfiguration(String instanceId, String table, NamespaceConfiguration parent) {
this(instanceId, HdfsZooInstance.getInstance(), table, parent);
@@ -57,8 +53,6 @@ public class TableConfiguration extends AccumuloConfiguration {
this.instance = instance;
this.table = table;
this.parent = parent;
-
- this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
}
private void initializeZooCache() {
@@ -76,6 +70,7 @@ public class TableConfiguration extends AccumuloConfiguration {
return tablePropCache;
}
+ @Override
public void addObserver(ConfigurationObserver co) {
if (table == null) {
String err = "Attempt to add observer for non-table configuration";
@@ -83,34 +78,17 @@ public class TableConfiguration extends AccumuloConfiguration {
throw new RuntimeException(err);
}
iterator();
- observers.add(co);
+ super.addObserver(co);
}
- public void removeObserver(ConfigurationObserver configObserver) {
+ @Override
+ public void removeObserver(ConfigurationObserver co) {
if (table == null) {
String err = "Attempt to remove observer for non-table configuration";
log.error(err);
throw new RuntimeException(err);
}
- observers.remove(configObserver);
- }
-
- public void expireAllObservers() {
- Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
- for (ConfigurationObserver co : copy)
- co.sessionExpired();
- }
-
- public void propertyChanged(String key) {
- Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
- for (ConfigurationObserver co : copy)
- co.propertyChanged(key);
- }
-
- public void propertiesChanged(String key) {
- Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
- for (ConfigurationObserver co : copy)
- co.propertiesChanged();
+ super.removeObserver(co);
}
@Override