You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2023/04/07 08:33:44 UTC
[shardingsphere] branch master updated: Refactor global clock code (#25045)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7e88b927f86 Refactor global clock code (#25045)
7e88b927f86 is described below
commit 7e88b927f866f9b8d81e3cb2867d3ab71f36a51e
Author: ZhangCheng <fl...@outlook.com>
AuthorDate: Fri Apr 7 16:33:31 2023 +0800
Refactor global clock code (#25045)
* Refactor global clock code
* fix
---
.../core/executor/GlobalClockTransactionHook.java | 2 +-
.../OpenGaussGlobalClockTransactionExecutor.java | 4 +-
.../globalclock/core/rule/GlobalClockRule.java | 33 +------
.../type/tso/provider/RedisTSOProperties.java | 90 -----------------
.../type/tso/provider/RedisTSOPropertyKey.java | 48 +++++++++
.../type/tso/provider/RedisTSOProvider.java | 107 +++++++--------------
6 files changed, 91 insertions(+), 193 deletions(-)
diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
index a27648feca8..e2c0e3e9d99 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/GlobalClockTransactionHook.java
@@ -75,7 +75,7 @@ public final class GlobalClockTransactionHook extends TransactionHookAdapter {
if (!enabled) {
return;
}
- if (isolationLevel == null || TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
+ if (null == isolationLevel || TransactionIsolationLevel.READ_COMMITTED.equals(isolationLevel)) {
globalClockTransactionExecutor.sendSnapshotTimestamp(connections, globalClockProvider.getCurrentTimestamp());
}
}
diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java
index 680126b0675..c5420cdb6ac 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/executor/OpenGaussGlobalClockTransactionExecutor.java
@@ -29,7 +29,7 @@ public final class OpenGaussGlobalClockTransactionExecutor implements GlobalCloc
@Override
public void sendSnapshotTimestamp(final Collection<Connection> connections, final long globalTimestamp) throws SQLException {
- String setSnapshotTimestampSQL = String.format("SELECT %d AS SETSNAPSHOTCSN;", globalTimestamp);
+ String setSnapshotTimestampSQL = String.format("SELECT %d AS SETSNAPSHOTCSN", globalTimestamp);
for (Connection each : connections) {
try (Statement statement = each.createStatement()) {
statement.execute(setSnapshotTimestampSQL);
@@ -39,7 +39,7 @@ public final class OpenGaussGlobalClockTransactionExecutor implements GlobalCloc
@Override
public void sendCommitTimestamp(final Collection<Connection> connections, final long globalTimestamp) throws SQLException {
- String setCommitTimestampSQL = String.format("SELECT %d AS SETCOMMITCSN;", globalTimestamp);
+ String setCommitTimestampSQL = String.format("SELECT %d AS SETCOMMITCSN", globalTimestamp);
for (Connection each : connections) {
try (Statement statement = each.createStatement()) {
statement.execute(setCommitTimestampSQL);
diff --git a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java
index a1ac757d981..de7d29a6e8a 100644
--- a/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java
+++ b/kernel/global-clock/core/src/main/java/org/apache/shardingsphere/globalclock/core/rule/GlobalClockRule.java
@@ -19,12 +19,10 @@ package org.apache.shardingsphere.globalclock.core.rule;
import lombok.Getter;
import org.apache.shardingsphere.globalclock.api.config.GlobalClockRuleConfiguration;
-import org.apache.shardingsphere.globalclock.core.exception.GlobalClockNotEnabledException;
import org.apache.shardingsphere.globalclock.core.provider.GlobalClockProvider;
import org.apache.shardingsphere.infra.database.type.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.rule.identifier.scope.GlobalRule;
-import org.apache.shardingsphere.infra.util.exception.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.util.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.transaction.spi.TransactionHook;
@@ -42,15 +40,12 @@ public final class GlobalClockRule implements GlobalRule {
@Getter
private final GlobalClockRuleConfiguration configuration;
- private final GlobalClockProvider globalClockProvider;
-
- private final boolean enabled;
-
public GlobalClockRule(final GlobalClockRuleConfiguration ruleConfig, final Map<String, ShardingSphereDatabase> databases) {
configuration = ruleConfig;
- enabled = ruleConfig.isEnabled();
- globalClockProvider = enabled ? TypedSPILoader.getService(GlobalClockProvider.class, String.join(".", ruleConfig.getType(), ruleConfig.getProvider()),
- null == ruleConfig.getProps() ? new Properties() : ruleConfig.getProps()) : null;
+ if (ruleConfig.isEnabled()) {
+ TypedSPILoader.getService(GlobalClockProvider.class, String.join(".", ruleConfig.getType(), ruleConfig.getProvider()),
+ null == ruleConfig.getProps() ? new Properties() : ruleConfig.getProps());
+ }
TypedSPILoader.getService(TransactionHook.class, "GLOBAL_CLOCK", getProps(ruleConfig, databases));
}
@@ -68,26 +63,6 @@ public final class GlobalClockRule implements GlobalRule {
.flatMap(each -> each.getResourceMetaData().getDataSources().values().stream()).collect(Collectors.toList());
}
- /**
- * Get current timestamp.
- *
- * @return current timestamp
- */
- public long getCurrentTimestamp() {
- ShardingSpherePreconditions.checkState(enabled, GlobalClockNotEnabledException::new);
- return globalClockProvider.getCurrentTimestamp();
- }
-
- /**
- * Get next timestamp.
- *
- * @return next timestamp
- */
- public long getNextTimestamp() {
- ShardingSpherePreconditions.checkState(enabled, GlobalClockNotEnabledException::new);
- return globalClockProvider.getNextTimestamp();
- }
-
@Override
public String getType() {
return GlobalClockRule.class.getSimpleName();
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
deleted file mode 100644
index 420d4463626..00000000000
--- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProperties.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.shardingsphere.globalclock.type.tso.provider;
-
-import java.util.Properties;
-
-/**
- * Properties of RedisTSOProvider.
- */
-public enum RedisTSOProperties {
-
- HOST("host", "127.0.0.1"),
-
- PORT("port", "6379"),
-
- PASSWORD("password", ""),
-
- TIMEOUT_INTERVAL("timeoutInterval", "40000"),
-
- MAX_IDLE("maxIdle", "8"),
-
- MAX_TOTAL("maxTotal", "18");
-
- private final String name;
-
- private final String defaultValue;
-
- RedisTSOProperties(final String name, final String defaultValue) {
- this.name = name;
- this.defaultValue = defaultValue;
- }
-
- /**
- * Get name of properties.
- *
- * @return name
- */
- public String getName() {
- return name;
- }
-
- /**
- * Get default value of properties.
- *
- * @return default value
- */
- public String getDefaultValue() {
- return defaultValue;
- }
-
- /**
- * Get value of properties.
- *
- * @param properties properties
- * @return value
- */
- public String get(final Properties properties) {
- return properties.getProperty(name, defaultValue);
- }
-
- /**
- * Set value of properties if value != null,
- * remove key of properties if value == null.
- *
- * @param properties properties
- * @param value value
- */
- public void set(final Properties properties, final String value) {
- if (value == null) {
- properties.remove(name);
- } else {
- properties.setProperty(name, value);
- }
- }
-}
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOPropertyKey.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOPropertyKey.java
new file mode 100644
index 00000000000..30fea9dfa18
--- /dev/null
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOPropertyKey.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.globalclock.type.tso.provider;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.util.props.TypedPropertyKey;
+
+/**
+ * Property key of redis timestamp oracle provider.
+ */
+@RequiredArgsConstructor
+@Getter
+public enum RedisTSOPropertyKey implements TypedPropertyKey {
+
+ HOST("host", "127.0.0.1", String.class),
+
+ PORT("port", "6379", int.class),
+
+ PASSWORD("password", "", String.class),
+
+ TIMEOUT_INTERVAL("timeoutInterval", "40000", int.class),
+
+ MAX_IDLE("maxIdle", "8", int.class),
+
+ MAX_TOTAL("maxTotal", "18", int.class);
+
+ private final String key;
+
+ private final String defaultValue;
+
+ private final Class<?> type;
+}
diff --git a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
index 3a1c83308fb..0efda5cd594 100644
--- a/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
+++ b/kernel/global-clock/type/tso/provider/redis/src/main/java/org/apache/shardingsphere/globalclock/type/tso/provider/RedisTSOProvider.java
@@ -17,6 +17,7 @@
package org.apache.shardingsphere.globalclock.type.tso.provider;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
@@ -24,6 +25,7 @@ import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Redis timestamp oracle provider.
@@ -37,47 +39,48 @@ public final class RedisTSOProvider implements TSOProvider {
private static final long INIT_CSN = Integer.MAX_VALUE;
- private Properties redisTSOProperties;
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
private JedisPool jedisPool;
+ private Properties props;
+
@Override
public void init(final Properties props) {
- if (jedisPool != null) {
- return;
- }
- if (props == null) {
- redisTSOProperties = new Properties();
- RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.getDefaultValue());
- RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.getDefaultValue());
- RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.getDefaultValue());
- RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.getDefaultValue());
- RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.getDefaultValue());
- RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.getDefaultValue());
- } else {
- redisTSOProperties = new Properties(props);
- RedisTSOProperties.HOST.set(redisTSOProperties, RedisTSOProperties.HOST.get(props));
- RedisTSOProperties.PORT.set(redisTSOProperties, RedisTSOProperties.PORT.get(props));
- RedisTSOProperties.PASSWORD.set(redisTSOProperties, RedisTSOProperties.PASSWORD.get(props));
- RedisTSOProperties.TIMEOUT_INTERVAL.set(redisTSOProperties, RedisTSOProperties.TIMEOUT_INTERVAL.get(props));
- RedisTSOProperties.MAX_IDLE.set(redisTSOProperties, RedisTSOProperties.MAX_IDLE.get(props));
- RedisTSOProperties.MAX_TOTAL.set(redisTSOProperties, RedisTSOProperties.MAX_TOTAL.get(props));
+ this.props = props;
+ if (initialized.compareAndSet(false, true)) {
+ createJedisPool();
+ checkJedisPool();
+ initCSN();
}
+ }
+
+ private void createJedisPool() {
JedisPoolConfig config = new JedisPoolConfig();
- config.setMaxIdle(Integer.parseInt(RedisTSOProperties.MAX_IDLE.get(redisTSOProperties)));
- config.setMaxTotal(Integer.parseInt(RedisTSOProperties.MAX_TOTAL.get(redisTSOProperties)));
- if ("".equals(RedisTSOProperties.PASSWORD.get(redisTSOProperties))) {
- jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
- Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
- Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)));
- } else {
- jedisPool = new JedisPool(config, RedisTSOProperties.HOST.get(redisTSOProperties),
- Integer.parseInt(RedisTSOProperties.PORT.get(redisTSOProperties)),
- Integer.parseInt(RedisTSOProperties.TIMEOUT_INTERVAL.get(redisTSOProperties)),
- RedisTSOProperties.PASSWORD.get(redisTSOProperties));
+ config.setMaxIdle(Integer.parseInt(getValue(props, RedisTSOPropertyKey.MAX_IDLE)));
+ config.setMaxTotal(Integer.parseInt(getValue(props, RedisTSOPropertyKey.MAX_TOTAL)));
+ jedisPool = new JedisPool(config, getValue(props, RedisTSOPropertyKey.HOST),
+ Integer.parseInt(getValue(props, RedisTSOPropertyKey.PORT)), Integer.parseInt(getValue(props, RedisTSOPropertyKey.TIMEOUT_INTERVAL)),
+ getValue(props, RedisTSOPropertyKey.PASSWORD));
+ }
+
+ private void checkJedisPool() throws JedisConnectionException {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.ping();
+ }
+ }
+
+ private void initCSN() {
+ try (Jedis jedis = jedisPool.getResource()) {
+ String originalCSN = jedis.get(CSN_KEY);
+ if (Strings.isNullOrEmpty(originalCSN) || String.valueOf(ERROR_CSN).equals(originalCSN)) {
+ jedis.set(CSN_KEY, String.valueOf(INIT_CSN));
+ }
}
- checkJedisPool();
- initCSN();
+ }
+
+ private String getValue(final Properties props, final RedisTSOPropertyKey propertyKey) {
+ return props.containsKey(propertyKey.getKey()) ? props.getProperty(propertyKey.getKey()) : propertyKey.getDefaultValue();
}
@Override
@@ -98,44 +101,6 @@ public final class RedisTSOProvider implements TSOProvider {
return result;
}
- /**
- * Set csn to INIT_CSN.
- *
- * @return csn
- */
- public synchronized long initCSN() {
- String result = "";
- String oldCsn;
- Jedis jedis = jedisPool.getResource();
- try {
- oldCsn = jedis.get(CSN_KEY);
- if (oldCsn == null || oldCsn.equals(String.valueOf(ERROR_CSN))) {
- result = jedis.set(CSN_KEY, String.valueOf(INIT_CSN));
- }
- } finally {
- jedis.close();
- }
- if ("OK".equals(result)) {
- return INIT_CSN;
- } else {
- return ERROR_CSN;
- }
- }
-
- private void checkJedisPool() throws JedisConnectionException {
- Jedis resource = jedisPool.getResource();
- resource.ping();
- }
-
- /**
- * Get properties of redisTSOProvider.
- *
- * @return properties
- */
- public Properties getRedisTSOProperties() {
- return redisTSOProperties;
- }
-
@Override
public String getType() {
return "TSO.redis";