You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/04/07 02:15:43 UTC

[shardingsphere] branch master updated: add RedisTSOProvider of GlobalClock (#24921)

This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 76ed17c8f9a add RedisTSOProvider of GlobalClock (#24921)
76ed17c8f9a is described below

commit 76ed17c8f9a8cd1f9dd4ebb8de9552f1a25d11a5
Author: congzhou2603 <zh...@huawei.com>
AuthorDate: Fri Apr 7 10:15:32 2023 +0800

    add RedisTSOProvider of GlobalClock (#24921)
---
 .../core/executor/GlobalClockTransactionHook.java  |   2 +-
 .../global-clock/type/tso/provider/redis/pom.xml   |   6 ++
 .../type/tso/provider/RedisTSOProperties.java      |  90 +++++++++++++++++
 .../type/tso/provider/RedisTSOProvider.java        | 106 ++++++++++++++++++++-
 4 files changed, 199 insertions(+), 5 deletions(-)

diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
index ee570818595..a27648feca8 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
@@ -75,7 +75,7 @@ public final class GlobalClockTransactionHook extends TransactionHookAdapter {
         if (!enabled) {
             return;
         }
-        if (TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
+        if (isolationLevel == null || TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
             globalClockTransactionExecutor.sendSnapshotTimestamp(connections, globalClockProvider.getCurrentTimestamp());
         }
     }
diff --git a/kernel/global-clock/type/tso/provider/redis/pom.xml b/kernel/global-clock/type/tso/provider/redis/pom.xml
index 76784a188c8..7a652853083 100644
--- a/kernel/global-clock/type/tso/provider/redis/pom.xml
+++ b/kernel/global-clock/type/tso/provider/redis/pom.xml
@@ -44,5 +44,11 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>4.3.1</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
new file mode 100644
index 00000000000..420d4463626
--- /dev/null
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.globalclock.type.tso.provider;
+
+import java.util.Properties;
+
+/**
+ * Properties of RedisTSOProvider.
+ */
+public enum RedisTSOProperties {
+    
+    HOST("host", "127.0.0.1"),
+    
+    PORT("port", "6379"),
+    
+    PASSWORD("password", ""),
+    
+    TIMEOUT_INTERVAL("timeoutInterval", "40000"),
+    
+    MAX_IDLE("maxIdle", "8"),
+    
+    MAX_TOTAL("maxTotal", "18");
+    
+    private final String name;
+    
+    private final String defaultValue;
+    
+    RedisTSOProperties(final String name, final String defaultValue) {
+        this.name = name;
+        this.defaultValue = defaultValue;
+    }
+    
+    /**
+     * Get name of properties.
+     *
+     * @return name
+     */
+    public String getName() {
+        return name;
+    }
+    
+    /**
+     * Get default value of properties.
+     *
+     * @return default value
+     */
+    public String getDefaultValue() {
+        return defaultValue;
+    }
+    
+    /**
+     * Get value of properties.
+     *
+     * @param properties properties
+     * @return value
+     */
+    public String get(final Properties properties) {
+        return properties.getProperty(name, defaultValue);
+    }
+    
+    /**
+     * Set value of properties if value != null,
+     * remove key of properties if value == null.
+     *
+     * @param properties properties
+     * @param value value
+     */
+    public void set(final Properties properties, final String value) {
+        if (value == null) {
+            properties.remove(name);
+        } else {
+            properties.setProperty(name, value);
+        }
+    }
+}
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
index 05f74ab0213..3a1c83308fb 100644
--- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
@@ -17,25 +17,123 @@
 
 package org.apache.shardingsphere.globalclock.type.tso.provider;
 
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
 import java.util.Properties;
 
 /**
  * Redis timestamp oracle provider.
  */
+@Slf4j
 public final class RedisTSOProvider implements TSOProvider {
     
+    private static final String CSN_KEY = "csn";
+    
+    private static final long ERROR_CSN = 0;
+    
+    private static final long INIT_CSN = Integer.MAX_VALUE;
+    
+    private Properties redisTSOProperties;
+    
+    private JedisPool jedisPool;
+    
     @Override
     public void init(final Properties props) {
+        if (jedisPool != null) {
+            return;
+        }
+        if (props == null) {
+            redisTSOProperties = new Properties();
+            RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.getDefaultValue());
+            RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.getDefaultValue());
+            RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.getDefaultValue());
+            RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.getDefaultValue());
+            RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.getDefaultValue());
+            RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.getDefaultValue());
+        } else {
+            redisTSOProperties = new Properties(props);
+            RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.get(props));
+            RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.get(props));
+            RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.get(props));
+            RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.get(props));
+            RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.get(props));
+            RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.get(props));
+        }
+        JedisPoolConfig config = new JedisPoolConfig();
+        config.setMaxIdle(Integer.parseInt(RedisTSOProperties.MAX_IDLE.get(redisTSOProperties)));
+        config.setMaxTotal(Integer.parseInt(RedisTSOProperties.MAX_TOTAL.get(redisTSOProperties)));
+        if ("".equals(RedisTSOProperties.PASSWORD.get(redisTSOProperties))) {
+            jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
+                    Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
+                    Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)));
+        } else {
+            jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
+                    Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
+                    Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)),
+                    RedisTSOProperties.PASSWORD.get(redisTSOProperties));
+        }
+        checkJedisPool();
+        initCSN();
     }
     
     @Override
-    public long getCurrentTimestamp() {
-        return 0;
+    public long getCurrentTimestamp() throws JedisConnectionException {
+        long result;
+        try (Jedis jedis = jedisPool.getResource()) {
+            result = Long.parseLong(jedis.get(CSN_KEY));
+        }
+        return result;
     }
     
     @Override
-    public long getNextTimestamp() {
-        return 0;
+    public long getNextTimestamp() throws JedisConnectionException {
+        long result;
+        try (Jedis jedis = jedisPool.getResource()) {
+            result = jedis.incr(CSN_KEY);
+        }
+        return result;
+    }
+    
+    /**
+     * Set csn to INIT_CSN.
+     *
+     * @return csn
+     */
+    public synchronized long initCSN() {
+        String result = "";
+        String oldCsn;
+        Jedis jedis = jedisPool.getResource();
+        try {
+            oldCsn = jedis.get(CSN_KEY);
+            if (oldCsn == null || oldCsn.equals(String.valueOf(ERROR_CSN))) {
+                result = jedis.set(CSN_KEY, String.valueOf(INIT_CSN));
+            }
+        } finally {
+            jedis.close();
+        }
+        if ("OK".equals(result)) {
+            return INIT_CSN;
+        } else {
+            return ERROR_CSN;
+        }
+    }
+    
+    private void checkJedisPool() throws JedisConnectionException {
+        Jedis resource = jedisPool.getResource();
+        resource.ping();
+    }
+    
+    /**
+     * Get properties of redisTSOProvider.
+     *
+     * @return properties
+     */
+    public Properties getRedisTSOProperties() {
+        return redisTSOProperties;
     }
     
     @Override