You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/06/04 20:40:02 UTC

[bookkeeper] 04/04: [TABLE SERVICE] [DLOG] Fix `ConcurrentModificationException` on accessing component configuration

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.7
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 6c881f71554235182c615f324ecb4ed8a592ab38
Author: Sijie Guo <si...@apache.org>
AuthorDate: Thu May 3 19:08:35 2018 -0700

    [TABLE SERVICE] [DLOG] Fix `ConcurrentModificationException` on accessing component configuration
    
    Descriptions of the changes in this PR:
    
    *Motiviation*
    
    Currently when running table service in standalone mode, it occasionally throws `ConcurrentModificationException`. Because
    multiple components are sharing same underlying composite configuration. It causes the contention between copying keys and retrieving keys.
    
    *Solution*
    
    Change the component configuration to use a concurrent hashmap based configuration. Also changed the `loadConf` behavior to copy the keys / sub-keys
    into the component configuration. This prevents components sharing same underlying composite configuration.
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Jia Zhai <None>
    
    This closes #1378 from sijie/fix_conf
---
 .../common/conf/ComponentConfiguration.java        | 41 +++++++++++---------
 .../common/conf/ConcurrentConfiguration.java       | 28 +++++++-------
 .../common/config/ConcurrentBaseConfiguration.java | 45 +---------------------
 .../DistributedLogConfiguration.java               | 14 +++++--
 .../bookkeeper/stream/cluster/StreamCluster.java   | 24 ++++++------
 .../stream/server/conf/DLConfiguration.java        |  4 +-
 .../server/conf/StorageServerConfiguration.java    |  2 +-
 .../server/service/DLNamespaceProviderService.java |  6 ++-
 .../stream/storage/conf/StorageConfiguration.java  |  2 +-
 9 files changed, 70 insertions(+), 96 deletions(-)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
index 3ebb4a4..7d22443 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
@@ -37,25 +37,26 @@ public abstract class ComponentConfiguration implements Configuration {
     protected static final String DELIMITER = ".";
 
     private final String componentPrefix;
-    private final CompositeConfiguration conf;
+    private final CompositeConfiguration underlyingConf;
+    private final Configuration conf;
 
-    protected ComponentConfiguration(CompositeConfiguration conf,
+    protected ComponentConfiguration(CompositeConfiguration underlyingConf,
                                      String componentPrefix) {
         super();
-        this.conf = conf;
+        this.underlyingConf = underlyingConf;
+        this.conf = new ConcurrentConfiguration();
         this.componentPrefix = componentPrefix;
-    }
 
-    protected String getKeyName(String name) {
-        return this.componentPrefix + name;
+        // load the component keys
+        loadConf(underlyingConf);
     }
 
-    public String getComponentPrefix() {
-        return componentPrefix;
+    protected String getKeyName(String name) {
+        return name;
     }
 
     public CompositeConfiguration getUnderlyingConf() {
-        return conf;
+        return underlyingConf;
     }
 
     /**
@@ -66,7 +67,16 @@ public abstract class ComponentConfiguration implements Configuration {
      */
     public void loadConf(URL confURL) throws ConfigurationException {
         Configuration loadedConf = new PropertiesConfiguration(confURL);
-        conf.addConfiguration(loadedConf);
+        loadConf(loadedConf);
+    }
+
+    protected void loadConf(Configuration loadedConf) {
+        loadedConf.getKeys().forEachRemaining(fullKey -> {
+            if (fullKey.startsWith(componentPrefix)) {
+                String componentKey = fullKey.substring(componentPrefix.length());
+                setProperty(componentKey, loadedConf.getProperty(fullKey));
+            }
+        });
     }
 
     public void validate() throws ConfigurationException {
@@ -80,7 +90,7 @@ public abstract class ComponentConfiguration implements Configuration {
 
     @Override
     public boolean isEmpty() {
-        return conf.subset(componentPrefix).isEmpty();
+        return conf.isEmpty();
     }
 
     @Override
@@ -105,12 +115,7 @@ public abstract class ComponentConfiguration implements Configuration {
 
     @Override
     public void clear() {
-        Iterator<String> keys = conf.getKeys();
-        keys.forEachRemaining(s -> {
-            if (s.startsWith(componentPrefix)) {
-                conf.clearProperty(s);
-            }
-        });
+        conf.clear();
     }
 
     @Override
@@ -125,7 +130,7 @@ public abstract class ComponentConfiguration implements Configuration {
 
     @Override
     public Iterator<String> getKeys() {
-        return conf.getKeys(componentPrefix);
+        return conf.getKeys();
     }
 
     @Override
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
similarity index 66%
copy from stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
index b390431..72d1504 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
@@ -7,36 +7,38 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
  */
-package org.apache.distributedlog.common.config;
+package org.apache.bookkeeper.common.conf;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.configuration.AbstractConfiguration;
 
 /**
  * Configuration view built on concurrent hash map for fast thread-safe access.
- * Notes:
- * 1. Multi-property list aggregation will not work in this class. I.e. commons config
+ *
+ * <p>Notes: Multi-property list aggregation will not work in this class. I.e. commons config
  * normally combines all properties with the same key into one list property automatically.
  * This class simply overwrites any existing mapping.
  */
 @SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
+public class ConcurrentConfiguration extends AbstractConfiguration {
 
-    private final ConcurrentHashMap<String, Object> map;
+    private final ConcurrentMap<String, Object> map;
 
-    public ConcurrentBaseConfiguration() {
-        this.map = new ConcurrentHashMap<String, Object>();
+    public ConcurrentConfiguration() {
+        this.map = new ConcurrentHashMap<>();
     }
 
     @Override
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
index b390431..e617db1 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
@@ -17,11 +17,7 @@
  */
 package org.apache.distributedlog.common.config;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.bookkeeper.common.conf.ConcurrentConfiguration;
 
 /**
  * Configuration view built on concurrent hash map for fast thread-safe access.
@@ -31,42 +27,5 @@ import org.apache.commons.configuration.AbstractConfiguration;
  * This class simply overwrites any existing mapping.
  */
 @SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
-
-    private final ConcurrentHashMap<String, Object> map;
-
-    public ConcurrentBaseConfiguration() {
-        this.map = new ConcurrentHashMap<String, Object>();
-    }
-
-    @Override
-    protected void addPropertyDirect(String key, Object value) {
-        checkNotNull(value);
-        map.put(key, value);
-    }
-
-    @Override
-    public Object getProperty(String key) {
-        return map.get(key);
-    }
-
-    @Override
-    public Iterator getKeys() {
-        return map.keySet().iterator();
-    }
-
-    @Override
-    public boolean containsKey(String key) {
-        return map.containsKey(key);
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    @Override
-    protected void clearPropertyDirect(String key) {
-        map.remove(key);
-    }
+public class ConcurrentBaseConfiguration extends ConcurrentConfiguration {
 }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 5b16fe0..4052f4d 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -517,16 +517,22 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      * @param baseConf Other Configuration
      */
     public void loadConf(DistributedLogConfiguration baseConf) {
-        addConfiguration(baseConf);
+        for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+            String key = iter.next();
+            setProperty(key, baseConf.getProperty(key));
+        }
     }
 
     /**
      * Load configuration from other configuration object.
      *
-     * @param otherConf Other configuration object
+     * @param baseConf Other configuration object
      */
-    public void loadConf(Configuration otherConf) {
-        addConfiguration(otherConf);
+    public void loadConf(Configuration baseConf) {
+        for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+            String key = iter.next();
+            setProperty(key, baseConf.getProperty(key));
+        }
     }
 
     /**
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 719a109..72a41e5 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -45,7 +45,6 @@ import org.apache.bookkeeper.stream.storage.api.controller.StorageController;
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
@@ -83,9 +82,18 @@ public class StreamCluster
     private static final String LEDGERS_AVAILABLE_PATH = "/stream/ledgers/available";
     private static final String NAMESPACE = "/stream/storage";
 
+    private static ServerConfiguration newServerConfiguration(String zkEnsemble) {
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
+        serverConf.setAllowLoopback(true);
+        serverConf.setGcWaitTime(300000);
+        serverConf.setDiskUsageWarnThreshold(0.9999f);
+        serverConf.setDiskUsageThreshold(0.999999f);
+        return serverConf;
+    }
+
     private final StreamClusterSpec spec;
     private final List<Endpoint> rpcEndpoints;
-    private CompositeConfiguration baseConf;
     private String zkEnsemble;
     private int zkPort;
     private ZooKeeperServerShim zks;
@@ -140,15 +148,6 @@ public class StreamCluster
         log.info("Initializing the stream cluster.");
         ZooKeeper zkc = null;
         try (StorageController controller = new HelixStorageController(zkEnsemble)) {
-            // initialize the configuration
-            ServerConfiguration serverConf = new ServerConfiguration();
-            serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
-            serverConf.setAllowLoopback(true);
-            serverConf.setGcWaitTime(300000);
-            serverConf.setDiskUsageWarnThreshold(0.9999f);
-            serverConf.setDiskUsageThreshold(0.999999f);
-            this.baseConf = serverConf;
-
             zkc = ZooKeeperClient.newBuilder()
                 .connectString(zkEnsemble)
                 .sessionTimeoutMs(60000)
@@ -194,8 +193,7 @@ public class StreamCluster
             }
             LifecycleComponent server = null;
             try {
-                ServerConfiguration serverConf = new ServerConfiguration();
-                serverConf.loadConf(baseConf);
+                ServerConfiguration serverConf = newServerConfiguration(zkEnsemble);
                 serverConf.setBookiePort(bookiePort);
                 File bkDir = new File(spec.storageRootDir(), "bookie_" + bookiePort);
                 serverConf.setJournalDirName(bkDir.getPath());
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
index db87057..b3cd91d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
@@ -18,11 +18,11 @@ import org.apache.bookkeeper.common.conf.ComponentConfiguration;
 import org.apache.commons.configuration.CompositeConfiguration;
 
 /**
- * A configuration delegates distributedlog configuration {@link org.apache.bookkeeper.DistributedLogConfiguration}.
+ * A configuration delegates distributedlog configuration {@link org.apache.distributedlog.DistributedLogConfiguration}.
  */
 public class DLConfiguration extends ComponentConfiguration {
 
-    public static final String COMPONENT_PREFIX = "dl" + DELIMITER;
+    public static final String COMPONENT_PREFIX = "dlog" + DELIMITER;
 
     public static DLConfiguration of(CompositeConfiguration conf) {
         return new DLConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index d87854a..a0ef963 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -22,7 +22,7 @@ import org.apache.commons.configuration.CompositeConfiguration;
  */
 public class StorageServerConfiguration extends ComponentConfiguration {
 
-    private static final String COMPONENT_PREFIX = "rangeserver" + DELIMITER;
+    private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
 
     public static StorageServerConfiguration of(CompositeConfiguration conf) {
         return new StorageServerConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index d65c73e..04d030d 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -26,6 +26,7 @@ import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
@@ -68,6 +69,7 @@ public class DLNamespaceProviderService
 
     private final ServerConfiguration bkServerConf;
     private final DistributedLogConfiguration dlConf;
+    private final DynamicDistributedLogConfiguration dlDynConf;
     @Getter
     private final URI dlogUri;
     private Namespace namespace;
@@ -81,7 +83,7 @@ public class DLNamespaceProviderService
             ZKMetadataDriverBase.resolveZkServers(bkServerConf)));
         this.bkServerConf = bkServerConf;
         this.dlConf = new DistributedLogConfiguration();
-        ConfUtils.loadConfiguration(this.dlConf, conf, conf.getComponentPrefix());
+        this.dlConf.loadConf(conf);
         // disable write lock
         this.dlConf.setWriteLockEnabled(false);
         // setting the flush policy
@@ -93,6 +95,7 @@ public class DLNamespaceProviderService
         // rolling log segment concurrency is only 1
         this.dlConf.setLogSegmentRollingConcurrency(1);
         this.dlConf.setMaxLogSegmentBytes(256 * 1024 * 1024); // 256 MB
+        this.dlDynConf = ConfUtils.getConstDynConf(dlConf);
     }
 
     @Override
@@ -109,6 +112,7 @@ public class DLNamespaceProviderService
                 .statsLogger(getStatsLogger())
                 .clientId("storage-server")
                 .conf(dlConf)
+                .dynConf(dlDynConf)
                 .uri(uri)
                 .build();
         } catch (Throwable e) {
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index 32208d0..b0f8909 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -24,7 +24,7 @@ public class StorageConfiguration extends ComponentConfiguration {
 
     private static final String COMPONENT_PREFIX = "storage" + DELIMITER;
 
-    private static final String RANGE_STORE_DIRS = "range_store_dirs";
+    private static final String RANGE_STORE_DIRS = "range.store.dirs";
 
     private static final String SERVE_READONLY_TABLES = "serve.readonly.tables";
 

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.