You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/05/04 02:08:46 UTC

[GitHub] sijie closed pull request #1378: [table service][dlog] Fix `ConcurrentModificationException` on accessing component configuration

sijie closed pull request #1378: [table service][dlog] Fix `ConcurrentModificationException` on accessing component configuration
URL: https://github.com/apache/bookkeeper/pull/1378
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
index 3ebb4a483..7d2244309 100644
--- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ComponentConfiguration.java
@@ -37,25 +37,26 @@
     protected static final String DELIMITER = ".";
 
     private final String componentPrefix;
-    private final CompositeConfiguration conf;
+    private final CompositeConfiguration underlyingConf;
+    private final Configuration conf;
 
-    protected ComponentConfiguration(CompositeConfiguration conf,
+    protected ComponentConfiguration(CompositeConfiguration underlyingConf,
                                      String componentPrefix) {
         super();
-        this.conf = conf;
+        this.underlyingConf = underlyingConf;
+        this.conf = new ConcurrentConfiguration();
         this.componentPrefix = componentPrefix;
-    }
 
-    protected String getKeyName(String name) {
-        return this.componentPrefix + name;
+        // load the component keys
+        loadConf(underlyingConf);
     }
 
-    public String getComponentPrefix() {
-        return componentPrefix;
+    protected String getKeyName(String name) {
+        return name;
     }
 
     public CompositeConfiguration getUnderlyingConf() {
-        return conf;
+        return underlyingConf;
     }
 
     /**
@@ -66,7 +67,16 @@ public CompositeConfiguration getUnderlyingConf() {
      */
     public void loadConf(URL confURL) throws ConfigurationException {
         Configuration loadedConf = new PropertiesConfiguration(confURL);
-        conf.addConfiguration(loadedConf);
+        loadConf(loadedConf);
+    }
+
+    protected void loadConf(Configuration loadedConf) {
+        loadedConf.getKeys().forEachRemaining(fullKey -> {
+            if (fullKey.startsWith(componentPrefix)) {
+                String componentKey = fullKey.substring(componentPrefix.length());
+                setProperty(componentKey, loadedConf.getProperty(fullKey));
+            }
+        });
     }
 
     public void validate() throws ConfigurationException {
@@ -80,7 +90,7 @@ public Configuration subset(String prefix) {
 
     @Override
     public boolean isEmpty() {
-        return conf.subset(componentPrefix).isEmpty();
+        return conf.isEmpty();
     }
 
     @Override
@@ -105,12 +115,7 @@ public void clearProperty(String key) {
 
     @Override
     public void clear() {
-        Iterator<String> keys = conf.getKeys();
-        keys.forEachRemaining(s -> {
-            if (s.startsWith(componentPrefix)) {
-                conf.clearProperty(s);
-            }
-        });
+        conf.clear();
     }
 
     @Override
@@ -125,7 +130,7 @@ public Object getProperty(String key) {
 
     @Override
     public Iterator<String> getKeys() {
-        return conf.getKeys(componentPrefix);
+        return conf.getKeys();
     }
 
     @Override
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
new file mode 100644
index 000000000..72d1504aa
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/conf/ConcurrentConfiguration.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.conf;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.commons.configuration.AbstractConfiguration;
+
+/**
+ * Configuration view built on concurrent hash map for fast thread-safe access.
+ *
+ * <p>Notes: Multi-property list aggregation will not work in this class. I.e. commons config
+ * normally combines all properties with the same key into one list property automatically.
+ * This class simply overwrites any existing mapping.
+ */
+@SuppressWarnings("unchecked")
+public class ConcurrentConfiguration extends AbstractConfiguration {
+
+    private final ConcurrentMap<String, Object> map;
+
+    public ConcurrentConfiguration() {
+        this.map = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    protected void addPropertyDirect(String key, Object value) {
+        checkNotNull(value);
+        map.put(key, value);
+    }
+
+    @Override
+    public Object getProperty(String key) {
+        return map.get(key);
+    }
+
+    @Override
+    public Iterator getKeys() {
+        return map.keySet().iterator();
+    }
+
+    @Override
+    public boolean containsKey(String key) {
+        return map.containsKey(key);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return map.isEmpty();
+    }
+
+    @Override
+    protected void clearPropertyDirect(String key) {
+        map.remove(key);
+    }
+}
diff --git a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
index b390431b9..e617db15e 100644
--- a/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
+++ b/stream/distributedlog/common/src/main/java/org/apache/distributedlog/common/config/ConcurrentBaseConfiguration.java
@@ -17,11 +17,7 @@
  */
 package org.apache.distributedlog.common.config;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.configuration.AbstractConfiguration;
+import org.apache.bookkeeper.common.conf.ConcurrentConfiguration;
 
 /**
  * Configuration view built on concurrent hash map for fast thread-safe access.
@@ -31,42 +27,5 @@
  * This class simply overwrites any existing mapping.
  */
 @SuppressWarnings("unchecked")
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
-
-    private final ConcurrentHashMap<String, Object> map;
-
-    public ConcurrentBaseConfiguration() {
-        this.map = new ConcurrentHashMap<String, Object>();
-    }
-
-    @Override
-    protected void addPropertyDirect(String key, Object value) {
-        checkNotNull(value);
-        map.put(key, value);
-    }
-
-    @Override
-    public Object getProperty(String key) {
-        return map.get(key);
-    }
-
-    @Override
-    public Iterator getKeys() {
-        return map.keySet().iterator();
-    }
-
-    @Override
-    public boolean containsKey(String key) {
-        return map.containsKey(key);
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return map.isEmpty();
-    }
-
-    @Override
-    protected void clearPropertyDirect(String key) {
-        map.remove(key);
-    }
+public class ConcurrentBaseConfiguration extends ConcurrentConfiguration {
 }
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 5b16fe000..4052f4d19 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -517,16 +517,22 @@ public void loadConf(URL confURL) throws ConfigurationException {
      * @param baseConf Other Configuration
      */
     public void loadConf(DistributedLogConfiguration baseConf) {
-        addConfiguration(baseConf);
+        for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+            String key = iter.next();
+            setProperty(key, baseConf.getProperty(key));
+        }
     }
 
     /**
      * Load configuration from other configuration object.
      *
-     * @param otherConf Other configuration object
+     * @param baseConf Other configuration object
      */
-    public void loadConf(Configuration otherConf) {
-        addConfiguration(otherConf);
+    public void loadConf(Configuration baseConf) {
+        for (Iterator<String> iter = baseConf.getKeys(); iter.hasNext(); ) {
+            String key = iter.next();
+            setProperty(key, baseConf.getProperty(key));
+        }
     }
 
     /**
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
index 719a109ea..72a41e508 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/cluster/StreamCluster.java
@@ -45,7 +45,6 @@
 import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
 import org.apache.bookkeeper.stream.storage.impl.sc.helix.HelixStorageController;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
-import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.LocalDLMEmulator;
@@ -83,9 +82,18 @@ public static StreamCluster build(StreamClusterSpec spec) {
     private static final String LEDGERS_AVAILABLE_PATH = "/stream/ledgers/available";
     private static final String NAMESPACE = "/stream/storage";
 
+    private static ServerConfiguration newServerConfiguration(String zkEnsemble) {
+        ServerConfiguration serverConf = new ServerConfiguration();
+        serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
+        serverConf.setAllowLoopback(true);
+        serverConf.setGcWaitTime(300000);
+        serverConf.setDiskUsageWarnThreshold(0.9999f);
+        serverConf.setDiskUsageThreshold(0.999999f);
+        return serverConf;
+    }
+
     private final StreamClusterSpec spec;
     private final List<Endpoint> rpcEndpoints;
-    private CompositeConfiguration baseConf;
     private String zkEnsemble;
     private int zkPort;
     private ZooKeeperServerShim zks;
@@ -140,15 +148,6 @@ private void initializeCluster() throws Exception {
         log.info("Initializing the stream cluster.");
         ZooKeeper zkc = null;
         try (StorageController controller = new HelixStorageController(zkEnsemble)) {
-            // initialize the configuration
-            ServerConfiguration serverConf = new ServerConfiguration();
-            serverConf.setMetadataServiceUri("zk://" + zkEnsemble + LEDGERS_PATH);
-            serverConf.setAllowLoopback(true);
-            serverConf.setGcWaitTime(300000);
-            serverConf.setDiskUsageWarnThreshold(0.9999f);
-            serverConf.setDiskUsageThreshold(0.999999f);
-            this.baseConf = serverConf;
-
             zkc = ZooKeeperClient.newBuilder()
                 .connectString(zkEnsemble)
                 .sessionTimeoutMs(60000)
@@ -194,8 +193,7 @@ private LifecycleComponent startServer() throws Exception {
             }
             LifecycleComponent server = null;
             try {
-                ServerConfiguration serverConf = new ServerConfiguration();
-                serverConf.loadConf(baseConf);
+                ServerConfiguration serverConf = newServerConfiguration(zkEnsemble);
                 serverConf.setBookiePort(bookiePort);
                 File bkDir = new File(spec.storageRootDir(), "bookie_" + bookiePort);
                 serverConf.setJournalDirName(bkDir.getPath());
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
index db8705796..b3cd91d7a 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/DLConfiguration.java
@@ -18,11 +18,11 @@
 import org.apache.commons.configuration.CompositeConfiguration;
 
 /**
- * A configuration delegates distributedlog configuration {@link org.apache.bookkeeper.DistributedLogConfiguration}.
+ * A configuration delegates distributedlog configuration {@link org.apache.distributedlog.DistributedLogConfiguration}.
  */
 public class DLConfiguration extends ComponentConfiguration {
 
-    public static final String COMPONENT_PREFIX = "dl" + DELIMITER;
+    public static final String COMPONENT_PREFIX = "dlog" + DELIMITER;
 
     public static DLConfiguration of(CompositeConfiguration conf) {
         return new DLConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
index d87854a59..a0ef963ed 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/conf/StorageServerConfiguration.java
@@ -22,7 +22,7 @@
  */
 public class StorageServerConfiguration extends ComponentConfiguration {
 
-    private static final String COMPONENT_PREFIX = "rangeserver" + DELIMITER;
+    private static final String COMPONENT_PREFIX = "storageserver" + DELIMITER;
 
     public static StorageServerConfiguration of(CompositeConfiguration conf) {
         return new StorageServerConfiguration(conf);
diff --git a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
index d65c73e20..04d030dd2 100644
--- a/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
+++ b/stream/server/src/main/java/org/apache/bookkeeper/stream/server/service/DLNamespaceProviderService.java
@@ -26,6 +26,7 @@
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
 import org.apache.distributedlog.exceptions.ZKException;
 import org.apache.distributedlog.impl.metadata.BKDLConfig;
 import org.apache.distributedlog.metadata.DLMetadata;
@@ -68,6 +69,7 @@ private static URI initializeNamespace(ServerConfiguration bkServerConf,
 
     private final ServerConfiguration bkServerConf;
     private final DistributedLogConfiguration dlConf;
+    private final DynamicDistributedLogConfiguration dlDynConf;
     @Getter
     private final URI dlogUri;
     private Namespace namespace;
@@ -81,7 +83,7 @@ public DLNamespaceProviderService(ServerConfiguration bkServerConf,
             ZKMetadataDriverBase.resolveZkServers(bkServerConf)));
         this.bkServerConf = bkServerConf;
         this.dlConf = new DistributedLogConfiguration();
-        ConfUtils.loadConfiguration(this.dlConf, conf, conf.getComponentPrefix());
+        this.dlConf.loadConf(conf);
         // disable write lock
         this.dlConf.setWriteLockEnabled(false);
         // setting the flush policy
@@ -93,6 +95,7 @@ public DLNamespaceProviderService(ServerConfiguration bkServerConf,
         // rolling log segment concurrency is only 1
         this.dlConf.setLogSegmentRollingConcurrency(1);
         this.dlConf.setMaxLogSegmentBytes(256 * 1024 * 1024); // 256 MB
+        this.dlDynConf = ConfUtils.getConstDynConf(dlConf);
     }
 
     @Override
@@ -109,6 +112,7 @@ protected void doStart() {
                 .statsLogger(getStatsLogger())
                 .clientId("storage-server")
                 .conf(dlConf)
+                .dynConf(dlDynConf)
                 .uri(uri)
                 .build();
         } catch (Throwable e) {
diff --git a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
index 32208d037..b0f8909ad 100644
--- a/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
+++ b/stream/storage/api/src/main/java/org/apache/bookkeeper/stream/storage/conf/StorageConfiguration.java
@@ -24,7 +24,7 @@
 
     private static final String COMPONENT_PREFIX = "storage" + DELIMITER;
 
-    private static final String RANGE_STORE_DIRS = "range_store_dirs";
+    private static final String RANGE_STORE_DIRS = "range.store.dirs";
 
     private static final String SERVE_READONLY_TABLES = "serve.readonly.tables";
 
diff --git a/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy b/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy
index 8e0d8d71f..940501df6 100644
--- a/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy
+++ b/tests/backward-compat/upgrade/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgrade.groovy
@@ -178,12 +178,12 @@ class TestCompatUpgrade {
 
     @Test
     public void test460to461() throws Exception {
-        testUpgrade("4.6.0", "4.6.1")
+        testUpgrade("4.6.0", "4.6.1", false, true)
     }
 
     @Test
     public void test461to462() throws Exception {
-        testUpgrade("4.6.1", "4.6.2")
+        testUpgrade("4.6.1", "4.6.2", false, true)
     }
 
     @Test
@@ -193,6 +193,6 @@ class TestCompatUpgrade {
 
     @Test
     public void test470toCurrentMaster() throws Exception {
-        testUpgrade("4.7.0", System.getProperty("currentVersion"), false, true)
+        testUpgrade("4.7.0", System.getProperty("currentVersion"))
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services