You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:40:02 UTC
[bookkeeper] 04/04: [TABLE SERVICE] [DLOG] Fix
`ConcurrentModificationException` on accessing component configuration
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 6c881f71554235182c615f324ecb4ed8a592ab38
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 3 19:08:35 2018 -0700
[TABLE SERVICE] [DLOG] Fix `ConcurrentModificationException` on accessing component configuration
Descriptions of the changes in this PR:
*Motiviation*
Currently when running table service in standalone mode, it occasionally throws `ConcurrentModificationException`. Because
multiple components are sharing same underlying composite configuration. It causes the contention between copying keys and retrieving keys.
*Solution*
Change the component configuration to use a concurrent hashmap based configuration. Also changed the `loadConf` behavior to copy the keys / sub-keys
into the component configuration. This prevents components sharing same underlying composite configuration.
Author: Sijie Guo <si...@apache.org>
Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
This closes #1378 from sijie/fix_conf
---
.../common/conf/ComponentConfiguration.java | 41 +++++++++++---------
.../common/conf/ConcurrentConfiguration.java | 28 +++++++-------
.../common/config/ConcurrentBaseConfiguration.java | 45 +---------------------
.../DistributedLogConfiguration.java | 14 +++++--
.../bookkeeper/stream/cluster/StreamCluster.java | 24 ++++++------
.../stream/server/conf/DLConfiguration.java | 4 +-
.../server/conf/StorageServerConfiguration.java | 2 +-
.../server/service/DLNamespaceProviderService.java | 6 ++-
.../stream/storage/conf/StorageConfiguration.java | 2 +-
9 files changed, 70 insertions(+), 96 deletions(-)
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
index 3ebb4a4..7d22443 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
@@ -37,25 +37,26 @@ public abstract class ComponentConfiguration implements Configuration {
protected static final String DELIMITER = ".";
private final String componentPrefix;
- private final CompositeConfiguration conf;
+ private final CompositeConfiguration underlyingConf;
+ private final Configuration conf;
- protected ComponentConfiguration(CompositeConfiguration conf,
+ protected ComponentConfiguration(CompositeConfiguration underlyingConf,
String componentPrefix) {
super();
- this.conf = conf;
+ this.underlyingConf = underlyingConf;
+ this.conf = new ConcurrentConfiguration();
this.componentPrefix = componentPrefix;
- }
- protected String getKeyName(String name) {
- return this.componentPrefix + name;
+ // load the component keys
+ loadConf(underlyingConf);
}
- public String getComponentPrefix() {
- return componentPrefix;
+ protected String getKeyName(String name) {
+ return name;
}
public CompositeConfiguration getUnderlyingConf() {
- return conf;
+ return underlyingConf;
}
/**
@@ -66,7 +67,16 @@ public abstract class ComponentConfiguration implements Configuration {
*/
public void loadConf(URL confURL) throws ConfigurationException {
Configuration loadedConf = new PropertiesConfiguration(confURL);
- conf.addConfiguration(loadedConf);
+ loadConf(loadedConf);
+ }
+
+ protected void loadConf(Configuration loadedConf) {
+ loadedConf.getKeys().forEachRemaining(fullKey -> {
+ if (fullKey.startsWith(componentPrefix)) {
+ String componentKey = fullKey.substring(componentPrefix.length());
+ setProperty(componentKey, loadedConf.getProperty(fullKey));
+ }
+ });
}
public void validate() throws ConfigurationException {
@@ -80,7 +90,7 @@ public abstract class ComponentConfiguration implements Configuration {
@Override
public boolean isEmpty() {
- return conf.subset(componentPrefix).isEmpty();
+ return conf.isEmpty();
}
@Override
@@ -105,12 +115,7 @@ public abstract class ComponentConfiguration implements Configuration {
@Override
public void clear() {
- Iterator<String> keys = conf.getKeys();
- keys.forEachRemaining(s -> {
- if (s.startsWith(componentPrefix)) {
- conf.clearProperty(s);
- }
- });
+ conf.clear();
}
@Override
@@ -125,7 +130,7 @@ public abstract class ComponentConfiguration implements Configuration {
@Override
public Iterator<String> getKeys() {
- return conf.getKeys(componentPrefix);
+ return conf.getKeys();
}
@Override
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
similarity index 66%
copy from stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
index b390431..72d1504 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
@@ -7,36 +7,38 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.distributedlog.common.config;
+package org.apache.bookkeeper.common.conf;
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.configuration.AbstractConfiguration;
/**
* Configuration view built on concurrent hash map for fast thread-safe access.
- * Notes:
- * 1. Multi-property list aggregation will not work in this class. I.e. commons config
+ *
+ * <p>Notes: Multi-property list aggregation will not work in this class. I.e. commons config
* normally combines all properties with the same key into one list property automatically.
* This class simply overwrites any existing mapping.
*/
@SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
+public class ConcurrentConfiguration extends AbstractConfiguration {
- private final ConcurrentHashMap<String, Object> map;
+ private final ConcurrentMap<String, Object> map;
- public ConcurrentBaseConfiguration() {
- this.map = new ConcurrentHashMap<String, Object>();
+ public ConcurrentConfiguration() {
+ this.map = new ConcurrentHashMap<>();
}
@Override
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
index b390431..e617db1 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
@@ -17,11 +17,7 @@
*/
package org.apache.distributedlog.common.config;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.bookkeeper.common.conf.ConcurrentConfiguration;
/**
* Configuration view built on concurrent hash map for fast thread-safe access.
@@ -31,42 +27,5 @@ import org.apache.commons.configuration.AbstractConfiguration;
* This class simply overwrites any existing mapping.
*/
@SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
-
- private final ConcurrentHashMap<String, Object> map;
-
- public ConcurrentBaseConfiguration() {
- this.map = new ConcurrentHashMap<String, Object>();
- }
-
- @Override
- protected void addPropertyDirect(String key, Object value) {
- checkNotNull(value);
- map.put(key, value);
- }
-
- @Override
- public Object getProperty(String key) {
- return map.get(key);
- }
-
- @Override
- public Iterator getKeys() {
- return map.keySet().iterator();
- }
-
- @Override
- public boolean containsKey(String key) {
- return map.containsKey(key);
- }
-
- @Override
- public boolean isEmpty() {
- return map.isEmpty();
- }
-
- @Override
- protected void clearPropertyDirect(String key) {
- map.remove(key);
- }
+public class ConcurrentBaseConfiguration extends ConcurrentConfiguration {
}
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 5b16fe0..4052f4d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -517,16 +517,22 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
* @param baseConf Other Configuration
*/
public void loadConf(DistributedLogConfiguration baseConf) {
- addConfiguration(baseConf);
+ for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+ String key = iter.next();
+ setProperty(key, baseConf.getProperty(key));
+ }
}
/**
* Load configuration from other configuration object.
*
- * @param otherConf Other configuration object
+ * @param baseConf Other configuration object
*/
- public void loadConf(Configuration otherConf) {
- addConfiguration(otherConf);
+ public void loadConf(Configuration baseConf) {
+ for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+ String key = iter.next();
+ setProperty(key, baseConf.getProperty(key));
+ }
}
/**
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 719a109..72a41e5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LocalDLMEmulator;
@@ -83,9 +82,18 @@ public class StreamCluster
private static final String LEDGERS_AVAILABLE_PATH = "/stream/ledgers/available";
private static final String NAMESPACE = "/stream/storage";
+ private static ServerConfiguration newServerConfiguration(String zkEnsemble) {
+ ServerConfiguration serverConf = new ServerConfiguration();
+ serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
+ serverConf.setAllowLoopback(true);
+ serverConf.setGcWaitTime(300000);
+ serverConf.setDiskUsageWarnThreshold(0.9999f);
+ serverConf.setDiskUsageThreshold(0.999999f);
+ return serverConf;
+ }
+
private final StreamClusterSpec spec;
private final List<Endpoint> rpcEndpoints;
- private CompositeConfiguration baseConf;
private String zkEnsemble;
private int zkPort;
private ZooKeeperServerShim zks;
@@ -140,15 +148,6 @@ public class StreamCluster
log.info("Initializing the stream cluster.");
ZooKeeper zkc = null;
try (StorageController controller = new HelixStorageController(zkEnsemble)) {
- // initialize the configuration
- ServerConfiguration serverConf = new ServerConfiguration();
- serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
- serverConf.setAllowLoopback(true);
- serverConf.setGcWaitTime(300000);
- serverConf.setDiskUsageWarnThreshold(0.9999f);
- serverConf.setDiskUsageThreshold(0.999999f);
- this.baseConf = serverConf;
-
zkc = ZooKeeperClient.newBuilder()
.connectString(zkEnsemble)
.sessionTimeoutMs(60000)
@@ -194,8 +193,7 @@ public class StreamCluster
}
LifecycleComponent server = null;
try {
- ServerConfiguration serverConf = new ServerConfiguration();
- serverConf.loadConf(baseConf);
+ ServerConfiguration serverConf = newServerConfiguration(zkEnsemble);
serverConf.setBookiePort(bookiePort);
File bkDir = new File(spec.storageRootDir(), "bookie_" + bookiePort);
serverConf.setJournalDirName(bkDir.getPath());
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
index db87057..b3cd91d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
@@ -18,11 +18,11 @@ import org.apache.bookkeeper.common.conf.ComponentConfiguration;
import org.apache.commons.configuration.CompositeConfiguration;
/**
- * A configuration delegates distributedlog configuration {@link org.apache.bookkeeper.DistributedLogConfiguration}.
+ * A configuration delegates distributedlog configuration {@link org.apache.distributedlog.DistributedLogConfiguration}.
*/
public class DLConfiguration extends ComponentConfiguration {
- public static final String COMPONENT_PREFIX = "dl" + DELIMITER;
+ public static final String COMPONENT_PREFIX = "dlog" + DELIMITER;
public static DLConfiguration of(CompositeConfiguration conf) {
return new DLConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index d87854a..a0ef963 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.commons.configuration.CompositeConfiguration;
*/
public class StorageServerConfiguration extends ComponentConfiguration {
- private static final String COMPONENT_PREFIX = "rangeserver" + DELIMITER;
+ private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
public static StorageServerConfiguration of(CompositeConfiguration conf) {
return new StorageServerConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index d65c73e..04d030d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
@@ -68,6 +69,7 @@ public class DLNamespaceProviderService
private final ServerConfiguration bkServerConf;
private final DistributedLogConfiguration dlConf;
+ private final DynamicDistributedLogConfiguration dlDynConf;
@Getter
private final URI dlogUri;
private Namespace namespace;
@@ -81,7 +83,7 @@ public class DLNamespaceProviderService
ZKMetadataDriverBase.resolveZkServers(bkServerConf)));
this.bkServerConf = bkServerConf;
this.dlConf = new DistributedLogConfiguration();
- ConfUtils.loadConfiguration(this.dlConf, conf, conf.getComponentPrefix());
+ this.dlConf.loadConf(conf);
// disable write lock
this.dlConf.setWriteLockEnabled(false);
// setting the flush policy
@@ -93,6 +95,7 @@ public class DLNamespaceProviderService
// rolling log segment concurrency is only 1
this.dlConf.setLogSegmentRollingConcurrency(1);
this.dlConf.setMaxLogSegmentBytes(256 * 1024 * 1024); // 256 MB
+ this.dlDynConf = ConfUtils.getConstDynConf(dlConf);
}
@Override
@@ -109,6 +112,7 @@ public class DLNamespaceProviderService
.statsLogger(getStatsLogger())
.clientId("storage-server")
.conf(dlConf)
+ .dynConf(dlDynConf)
.uri(uri)
.build();
} catch (Throwable e) {
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index 32208d0..b0f8909 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -24,7 +24,7 @@ public class StorageConfiguration extends ComponentConfiguration {
private static final String COMPONENT_PREFIX = "storage" + DELIMITER;
- private static final String RANGE_STORE_DIRS = "range_store_dirs";
+ private static final String RANGE_STORE_DIRS = "range.store.dirs";
private static final String SERVE_READONLY_TABLES = "serve.readonly.tables";
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.