You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/04/07 08:33:44 UTC

[shardingsphere] branch master updated: Refactor global clock code (#25045)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e88b927f86 Refactor global clock code (#25045)
7e88b927f86 is described below

commit 7e88b927f866f9b8d81e3cb2867d3ab71f36a51e
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Fri Apr 7 16:33:31 2023 +0800

    Refactor global clock code (#25045)
    
    * Refactor global clock code
    
    * fix
---
 .../core/executor/GlobalClockTransactionHook.java  |   2 +-
 .../OpenGaussGlobalClockTransactionExecutor.java   |   4 +-
 .../globalclock/core/rule/GlobalClockRule.java     |  33 +------
 .../type/tso/provider/RedisTSOProperties.java      |  90 -----------------
 .../type/tso/provider/RedisTSOPropertyKey.java     |  48 +++++++++
 .../type/tso/provider/RedisTSOProvider.java        | 107 +++++++--------------
 6 files changed, 91 insertions(+), 193 deletions(-)

diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
index a27648feca8..e2c0e3e9d99 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
@@ -75,7 +75,7 @@ public final class GlobalClockTransactionHook extends TransactionHookAdapter {
         if (!enabled) {
             return;
         }
-        if (isolationLevel == null || TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
+        if (null == isolationLevel || TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
             globalClockTransactionExecutor.sendSnapshotTimestamp(connections, globalClockProvider.getCurrentTimestamp());
         }
     }
diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java
index 680126b0675..c5420cdb6ac 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java
@@ -29,7 +29,7 @@ public final class OpenGaussGlobalClockTransactionExecutor implements GlobalCloc
     
     @Override
     public void sendSnapshotTimestamp(final Collection<Connection> connections, final long globalTimestamp) throws SQLException {
-        String setSnapshotTimestampSQL = String.format("SELECT %d AS SETSNAPSHOTCSN;", globalTimestamp);
+        String setSnapshotTimestampSQL = String.format("SELECT %d AS SETSNAPSHOTCSN", globalTimestamp);
         for (Connection each : connections) {
             try (Statement statement = each.createStatement()) {
                 statement.execute(setSnapshotTimestampSQL);
@@ -39,7 +39,7 @@ public final class OpenGaussGlobalClockTransactionExecutor implements GlobalCloc
     
     @Override
     public void sendCommitTimestamp(final Collection<Connection> connections, final long globalTimestamp) throws SQLException {
-        String setCommitTimestampSQL = String.format("SELECT %d AS SETCOMMITCSN;", globalTimestamp);
+        String setCommitTimestampSQL = String.format("SELECT %d AS SETCOMMITCSN", globalTimestamp);
         for (Connection each : connections) {
             try (Statement statement = each.createStatement()) {
                 statement.execute(setCommitTimestampSQL);
diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java
index a1ac757d981..de7d29a6e8a 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java
@@ -19,12 +19,10 @@ package org.apache.shardingsphere.globalclock.core.rule;
 
 import lombok.Getter;
 import org.apache.shardingsphere.globalclock.api.config.GlobalClockRuleConfiguration;
-import org.apache.shardingsphere.globalclock.core.exception.GlobalClockNotEnabledException;
 import org.apache.shardingsphere.globalclock.core.provider.GlobalClockProvider;
 import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
 import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
 import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.transaction.spi.TransactionHook;
 
@@ -42,15 +40,12 @@ public final class GlobalClockRule implements GlobalRule {
     @Getter
     private final GlobalClockRuleConfiguration configuration;
     
-    private final GlobalClockProvider globalClockProvider;
-    
-    private final boolean enabled;
-    
     public GlobalClockRule(final GlobalClockRuleConfiguration ruleConfig, final Map<String, ShardingSphereDatabase> databases) {
         configuration = ruleConfig;
-        enabled = ruleConfig.isEnabled();
-        globalClockProvider = enabled ? TypedSPILoader.getService(GlobalClockProvider.class, String.join(".", ruleConfig.getType(), ruleConfig.getProvider()),
-                null == ruleConfig.getProps() ? new Properties() : ruleConfig.getProps()) : null;
+        if (ruleConfig.isEnabled()) {
+            TypedSPILoader.getService(GlobalClockProvider.class, String.join(".", ruleConfig.getType(), ruleConfig.getProvider()),
+                    null == ruleConfig.getProps() ? new Properties() : ruleConfig.getProps());
+        }
         TypedSPILoader.getService(TransactionHook.class, "GLOBAL_CLOCK", getProps(ruleConfig, databases));
     }
     
@@ -68,26 +63,6 @@ public final class GlobalClockRule implements GlobalRule {
                 .flatMap(each -> each.getResourceMetaData().getDataSources().values().stream()).collect(Collectors.toList());
     }
     
-    /**
-     * Get current timestamp.
-     *
-     * @return current timestamp
-     */
-    public long getCurrentTimestamp() {
-        ShardingSpherePreconditions.checkState(enabled, GlobalClockNotEnabledException::new);
-        return globalClockProvider.getCurrentTimestamp();
-    }
-    
-    /**
-     * Get next timestamp.
-     *
-     * @return next timestamp
-     */
-    public long getNextTimestamp() {
-        ShardingSpherePreconditions.checkState(enabled, GlobalClockNotEnabledException::new);
-        return globalClockProvider.getNextTimestamp();
-    }
-    
     @Override
     public String getType() {
         return GlobalClockRule.class.getSimpleName();
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
deleted file mode 100644
index 420d4463626..00000000000
--- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.globalclock.type.tso.provider;
-
-import java.util.Properties;
-
-/**
- * Properties of RedisTSOProvider.
- */
-public enum RedisTSOProperties {
-    
-    HOST("host", "127.0.0.1"),
-    
-    PORT("port", "6379"),
-    
-    PASSWORD("password", ""),
-    
-    TIMEOUT_INTERVAL("timeoutInterval", "40000"),
-    
-    MAX_IDLE("maxIdle", "8"),
-    
-    MAX_TOTAL("maxTotal", "18");
-    
-    private final String name;
-    
-    private final String defaultValue;
-    
-    RedisTSOProperties(final String name, final String defaultValue) {
-        this.name = name;
-        this.defaultValue = defaultValue;
-    }
-    
-    /**
-     * Get name of properties.
-     *
-     * @return name
-     */
-    public String getName() {
-        return name;
-    }
-    
-    /**
-     * Get default value of properties.
-     *
-     * @return default value
-     */
-    public String getDefaultValue() {
-        return defaultValue;
-    }
-    
-    /**
-     * Get value of properties.
-     *
-     * @param properties properties
-     * @return value
-     */
-    public String get(final Properties properties) {
-        return properties.getProperty(name, defaultValue);
-    }
-    
-    /**
-     * Set value of properties if value != null,
-     * remove key of properties if value == null.
-     *
-     * @param properties properties
-     * @param value value
-     */
-    public void set(final Properties properties, final String value) {
-        if (value == null) {
-            properties.remove(name);
-        } else {
-            properties.setProperty(name, value);
-        }
-    }
-}
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOPropertyKey.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOPropertyKey.java
new file mode 100644
index 00000000000..30fea9dfa18
--- /dev/null
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOPropertyKey.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.globalclock.type.tso.provider;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.props.TypedPropertyKey;
+
+/**
+ * Property key of redis timestamp oracle provider.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum RedisTSOPropertyKey implements TypedPropertyKey {
+    
+    HOST("host", "127.0.0.1", String.class),
+    
+    PORT("port", "6379", int.class),
+    
+    PASSWORD("password", "", String.class),
+    
+    TIMEOUT_INTERVAL("timeoutInterval", "40000", int.class),
+    
+    MAX_IDLE("maxIdle", "8", int.class),
+    
+    MAX_TOTAL("maxTotal", "18", int.class);
+    
+    private final String key;
+    
+    private final String defaultValue;
+    
+    private final Class<?> type;
+}
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
index 3a1c83308fb..0efda5cd594 100644
--- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.globalclock.type.tso.provider;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisPool;
@@ -24,6 +25,7 @@ import redis.clients.jedis.JedisPoolConfig;
 import redis.clients.jedis.exceptions.JedisConnectionException;
 
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Redis timestamp oracle provider.
@@ -37,47 +39,48 @@ public final class RedisTSOProvider implements TSOProvider {
     
     private static final long INIT_CSN = Integer.MAX_VALUE;
     
-    private Properties redisTSOProperties;
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
     
     private JedisPool jedisPool;
     
+    private Properties props;
+    
     @Override
     public void init(final Properties props) {
-        if (jedisPool != null) {
-            return;
-        }
-        if (props == null) {
-            redisTSOProperties = new Properties();
-            RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.getDefaultValue());
-            RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.getDefaultValue());
-            RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.getDefaultValue());
-            RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.getDefaultValue());
-            RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.getDefaultValue());
-            RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.getDefaultValue());
-        } else {
-            redisTSOProperties = new Properties(props);
-            RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.get(props));
-            RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.get(props));
-            RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.get(props));
-            RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.get(props));
-            RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.get(props));
-            RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.get(props));
+        this.props = props;
+        if (initialized.compareAndSet(false, true)) {
+            createJedisPool();
+            checkJedisPool();
+            initCSN();
         }
+    }
+    
+    private void createJedisPool() {
         JedisPoolConfig config = new JedisPoolConfig();
-        config.setMaxIdle(Integer.parseInt(RedisTSOProperties.MAX_IDLE.get(redisTSOProperties)));
-        config.setMaxTotal(Integer.parseInt(RedisTSOProperties.MAX_TOTAL.get(redisTSOProperties)));
-        if ("".equals(RedisTSOProperties.PASSWORD.get(redisTSOProperties))) {
-            jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
-                    Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
-                    Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)));
-        } else {
-            jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
-                    Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
-                    Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)),
-                    RedisTSOProperties.PASSWORD.get(redisTSOProperties));
+        config.setMaxIdle(Integer.parseInt(getValue(props, RedisTSOPropertyKey.MAX_IDLE)));
+        config.setMaxTotal(Integer.parseInt(getValue(props, RedisTSOPropertyKey.MAX_TOTAL)));
+        jedisPool = new JedisPool(config, getValue(props, RedisTSOPropertyKey.HOST),
+                Integer.parseInt(getValue(props, RedisTSOPropertyKey.PORT)), Integer.parseInt(getValue(props, RedisTSOPropertyKey.TIMEOUT_INTERVAL)),
+                getValue(props, RedisTSOPropertyKey.PASSWORD));
+    }
+    
+    private void checkJedisPool() throws JedisConnectionException {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.ping();
+        }
+    }
+    
+    private void initCSN() {
+        try (Jedis jedis = jedisPool.getResource()) {
+            String originalCSN = jedis.get(CSN_KEY);
+            if (Strings.isNullOrEmpty(originalCSN) || String.valueOf(ERROR_CSN).equals(originalCSN)) {
+                jedis.set(CSN_KEY, String.valueOf(INIT_CSN));
+            }
         }
-        checkJedisPool();
-        initCSN();
+    }
+    
+    private String getValue(final Properties props, final RedisTSOPropertyKey propertyKey) {
+        return props.containsKey(propertyKey.getKey()) ? props.getProperty(propertyKey.getKey()) : propertyKey.getDefaultValue();
     }
     
     @Override
@@ -98,44 +101,6 @@ public final class RedisTSOProvider implements TSOProvider {
         return result;
     }
     
-    /**
-     * Set csn to INIT_CSN.
-     *
-     * @return csn
-     */
-    public synchronized long initCSN() {
-        String result = "";
-        String oldCsn;
-        Jedis jedis = jedisPool.getResource();
-        try {
-            oldCsn = jedis.get(CSN_KEY);
-            if (oldCsn == null || oldCsn.equals(String.valueOf(ERROR_CSN))) {
-                result = jedis.set(CSN_KEY, String.valueOf(INIT_CSN));
-            }
-        } finally {
-            jedis.close();
-        }
-        if ("OK".equals(result)) {
-            return INIT_CSN;
-        } else {
-            return ERROR_CSN;
-        }
-    }
-    
-    private void checkJedisPool() throws JedisConnectionException {
-        Jedis resource = jedisPool.getResource();
-        resource.ping();
-    }
-    
-    /**
-     * Get properties of redisTSOProvider.
-     *
-     * @return properties
-     */
-    public Properties getRedisTSOProperties() {
-        return redisTSOProperties;
-    }
-    
     @Override
     public String getType() {
         return "TSO.redis";