You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/04/07 02:15:43 UTC
[shardingsphere] branch master updated: add RedisTSOProvider of GlobalClock (#24921)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 76ed17c8f9a add RedisTSOProvider of GlobalClock (#24921)
76ed17c8f9a is described below
commit 76ed17c8f9a8cd1f9dd4ebb8de9552f1a25d11a5
Author: congzhou2603 <zh...@huawei.com>
AuthorDate: Fri Apr 7 10:15:32 2023 +0800
add RedisTSOProvider of GlobalClock (#24921)
---
.../core/executor/GlobalClockTransactionHook.java | 2 +-
.../global-clock/type/tso/provider/redis/pom.xml | 6 ++
.../type/tso/provider/RedisTSOProperties.java | 90 +++++++++++++++++
.../type/tso/provider/RedisTSOProvider.java | 106 ++++++++++++++++++++-
4 files changed, 199 insertions(+), 5 deletions(-)
diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
index ee570818595..a27648feca8 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
@@ -75,7 +75,7 @@ public final class GlobalClockTransactionHook extends TransactionHookAdapter {
if (!enabled) {
return;
}
- if (TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
+ if (isolationLevel == null || TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
globalClockTransactionExecutor.sendSnapshotTimestamp(connections, globalClockProvider.getCurrentTimestamp());
}
}
diff --git a/kernel/global-clock/type/tso/provider/redis/pom.xml b/kernel/global-clock/type/tso/provider/redis/pom.xml
index 76784a188c8..7a652853083 100644
--- a/kernel/global-clock/type/tso/provider/redis/pom.xml
+++ b/kernel/global-clock/type/tso/provider/redis/pom.xml
@@ -44,5 +44,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>4.3.1</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
new file mode 100644
index 00000000000..420d4463626
--- /dev/null
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.globalclock.type.tso.provider;
+
+import java.util.Properties;
+
+/**
+ * Properties of RedisTSOProvider.
+ */
+public enum RedisTSOProperties {
+
+ HOST("host", "127.0.0.1"),
+
+ PORT("port", "6379"),
+
+ PASSWORD("password", ""),
+
+ TIMEOUT_INTERVAL("timeoutInterval", "40000"),
+
+ MAX_IDLE("maxIdle", "8"),
+
+ MAX_TOTAL("maxTotal", "18");
+
+ private final String name;
+
+ private final String defaultValue;
+
+ RedisTSOProperties(final String name, final String defaultValue) {
+ this.name = name;
+ this.defaultValue = defaultValue;
+ }
+
+ /**
+ * Get name of properties.
+ *
+ * @return name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get default value of properties.
+ *
+ * @return default value
+ */
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+
+ /**
+ * Get value of properties.
+ *
+ * @param properties properties
+ * @return value
+ */
+ public String get(final Properties properties) {
+ return properties.getProperty(name, defaultValue);
+ }
+
+ /**
+ * Set value of properties if value != null,
+ * remove key of properties if value == null.
+ *
+ * @param properties properties
+ * @param value value
+ */
+ public void set(final Properties properties, final String value) {
+ if (value == null) {
+ properties.remove(name);
+ } else {
+ properties.setProperty(name, value);
+ }
+ }
+}
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
index 05f74ab0213..3a1c83308fb 100644
--- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
@@ -17,25 +17,123 @@
package org.apache.shardingsphere.globalclock.type.tso.provider;
+import lombok.extern.slf4j.Slf4j;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.exceptions.JedisConnectionException;
+
import java.util.Properties;
/**
* Redis timestamp oracle provider.
*/
+@Slf4j
public final class RedisTSOProvider implements TSOProvider {
+ private static final String CSN_KEY = "csn";
+
+ private static final long ERROR_CSN = 0;
+
+ private static final long INIT_CSN = Integer.MAX_VALUE;
+
+ private Properties redisTSOProperties;
+
+ private JedisPool jedisPool;
+
@Override
public void init(final Properties props) {
+ if (jedisPool != null) {
+ return;
+ }
+ if (props == null) {
+ redisTSOProperties = new Properties();
+ RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.getDefaultValue());
+ RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.getDefaultValue());
+ RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.getDefaultValue());
+ RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.getDefaultValue());
+ RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.getDefaultValue());
+ RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.getDefaultValue());
+ } else {
+ redisTSOProperties = new Properties(props);
+ RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.get(props));
+ RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.get(props));
+ RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.get(props));
+ RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.get(props));
+ RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.get(props));
+ RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.get(props));
+ }
+ JedisPoolConfig config = new JedisPoolConfig();
+ config.setMaxIdle(Integer.parseInt(RedisTSOProperties.MAX_IDLE.get(redisTSOProperties)));
+ config.setMaxTotal(Integer.parseInt(RedisTSOProperties.MAX_TOTAL.get(redisTSOProperties)));
+ if ("".equals(RedisTSOProperties.PASSWORD.get(redisTSOProperties))) {
+ jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
+ Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
+ Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)));
+ } else {
+ jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
+ Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
+ Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)),
+ RedisTSOProperties.PASSWORD.get(redisTSOProperties));
+ }
+ checkJedisPool();
+ initCSN();
}
@Override
- public long getCurrentTimestamp() {
- return 0;
+ public long getCurrentTimestamp() throws JedisConnectionException {
+ long result;
+ try (Jedis jedis = jedisPool.getResource()) {
+ result = Long.parseLong(jedis.get(CSN_KEY));
+ }
+ return result;
}
@Override
- public long getNextTimestamp() {
- return 0;
+ public long getNextTimestamp() throws JedisConnectionException {
+ long result;
+ try (Jedis jedis = jedisPool.getResource()) {
+ result = jedis.incr(CSN_KEY);
+ }
+ return result;
+ }
+
+ /**
+ * Set csn to INIT_CSN.
+ *
+ * @return csn
+ */
+ public synchronized long initCSN() {
+ String result = "";
+ String oldCsn;
+ Jedis jedis = jedisPool.getResource();
+ try {
+ oldCsn = jedis.get(CSN_KEY);
+ if (oldCsn == null || oldCsn.equals(String.valueOf(ERROR_CSN))) {
+ result = jedis.set(CSN_KEY, String.valueOf(INIT_CSN));
+ }
+ } finally {
+ jedis.close();
+ }
+ if ("OK".equals(result)) {
+ return INIT_CSN;
+ } else {
+ return ERROR_CSN;
+ }
+ }
+
+ private void checkJedisPool() throws JedisConnectionException {
+ Jedis resource = jedisPool.getResource();
+ resource.ping();
+ }
+
+ /**
+ * Get properties of redisTSOProvider.
+ *
+ * @return properties
+ */
+ public Properties getRedisTSOProperties() {
+ return redisTSOProperties;
}
@Override