You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by zh...@apache.org on 2022/01/30 02:08:17 UTC

[shardingsphere] branch master updated: Refactor initRuleAlteredJobWorker from proxy-bootstrap module to pipeline-core module (#15184)

This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ebeb464  Refactor initRuleAlteredJobWorker from proxy-bootstrap module to pipeline-core module (#15184)
ebeb464 is described below

commit ebeb46402f67c7c8cc7c45c2848c7c88ae3c2b7f
Author: Hongsheng Zhong <sa...@126.com>
AuthorDate: Sun Jan 30 10:07:24 2022 +0800

    Refactor initRuleAlteredJobWorker from proxy-bootstrap module to pipeline-core module (#15184)
    
    * Add ContextManagerLifecycleListener SPI
    
    * Refactor initRuleAlteredJobWorker from proxy-bootstrap module to pipeline-core module
---
 .../spi/ContextManagerLifecycleListenerImpl.java   | 53 ++++++++++++++++++++++
 ...anager.listener.ContextManagerLifecycleListener | 18 ++++++++
 .../listener/ContextManagerLifecycleListener.java  | 40 ++++++++++++++++
 .../proxy/initializer/BootstrapInitializer.java    | 31 +++++++------
 4 files changed, 127 insertions(+), 15 deletions(-)

diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/ContextManagerLifecycleListenerImpl.java b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/ContextManagerLifecycleListenerImpl.java
new file mode 100644
index 0000000..f07bbd6
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/rulealtered/spi/ContextManagerLifecycleListenerImpl.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
+import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
+
+/**
+ * {@linkplain ContextManagerLifecycleListener} implementation for rule altered job.
+ */
+@Slf4j
+public final class ContextManagerLifecycleListenerImpl implements ContextManagerLifecycleListener {
+    
+    @Override
+    public void onInitialized(final ModeConfiguration modeConfig, final ContextManager contextManager) {
+        if (null == modeConfig) {
+            return;
+        }
+        // TODO decouple "Cluster" to pluggable
+        if (!"Cluster".equals(modeConfig.getType())) {
+            log.info("mode type is not Cluster, mode type='{}', ignore", modeConfig.getType());
+            return;
+        }
+        PipelineContext.initModeConfig(modeConfig);
+        PipelineContext.initContextManager(contextManager);
+        // TODO init worker only if necessary, e.g. 1) rule altered action configured, 2) enabled job exists, 3) stopped job restarted
+        RuleAlteredJobWorker.initWorkerIfNecessary();
+    }
+    
+    @Override
+    public String getType() {
+        return "RULE_ALTERED_JOB_WORKER";
+    }
+}
diff --git a/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
new file mode 100644
index 0000000..273bb87
--- /dev/null
+++ b/shardingsphere-kernel/shardingsphere-data-pipeline/shardingsphere-data-pipeline-core/src/main/resources/META-INF/services/org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.data.pipeline.scenario.rulealtered.spi.ContextManagerLifecycleListenerImpl
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java
new file mode 100644
index 0000000..2f2d7d1
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/listener/ContextManagerLifecycleListener.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.listener;
+
+import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import org.apache.shardingsphere.mode.manager.ContextManager;
+import org.apache.shardingsphere.spi.singleton.SingletonSPI;
+import org.apache.shardingsphere.spi.typed.TypedSPI;
+
+/**
+ * Context manager lifecycle listener.
+ * <p>
+ *     It just support <code>proxy</code> mode for now, <code>JDBC</code> mode is not supported.
+ * </p>
+ */
+public interface ContextManagerLifecycleListener extends TypedSPI, SingletonSPI {
+    
+    /**
+     * Callback on initialized.
+     *
+     * @param modeConfig mode configuration
+     * @param contextManager context manager
+     */
+    void onInitialized(ModeConfiguration modeConfig, ContextManager contextManager);
+}
diff --git a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
index 77def44..f611245 100644
--- a/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
+++ b/shardingsphere-proxy/shardingsphere-proxy-bootstrap/src/main/java/org/apache/shardingsphere/proxy/initializer/BootstrapInitializer.java
@@ -20,8 +20,6 @@ package org.apache.shardingsphere.proxy.initializer;
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.core.context.PipelineContext;
-import org.apache.shardingsphere.data.pipeline.scenario.rulealtered.RuleAlteredJobWorker;
 import org.apache.shardingsphere.db.protocol.CommonConstants;
 import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLServerInfo;
 import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLServerInfo;
@@ -34,15 +32,19 @@ import org.apache.shardingsphere.infra.yaml.config.swapper.mode.ModeConfiguratio
 import org.apache.shardingsphere.mode.manager.ContextManager;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderFactory;
 import org.apache.shardingsphere.mode.manager.ContextManagerBuilderParameter;
+import org.apache.shardingsphere.mode.manager.listener.ContextManagerLifecycleListener;
 import org.apache.shardingsphere.mode.metadata.MetaDataContexts;
 import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
 import org.apache.shardingsphere.proxy.config.ProxyConfiguration;
 import org.apache.shardingsphere.proxy.config.YamlProxyConfiguration;
 import org.apache.shardingsphere.proxy.config.yaml.swapper.YamlProxyConfigurationSwapper;
 import org.apache.shardingsphere.proxy.database.DatabaseServerInfo;
+import org.apache.shardingsphere.spi.singleton.SingletonSPIRegistry;
 
 import javax.sql.DataSource;
 import java.sql.SQLException;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 
 /**
@@ -63,7 +65,7 @@ public final class BootstrapInitializer {
         ModeConfiguration modeConfig = null == yamlConfig.getServerConfiguration().getMode() ? null : new ModeConfigurationYamlSwapper().swapToObject(yamlConfig.getServerConfiguration().getMode());
         ContextManager contextManager = createContextManager(yamlConfig, modeConfig, port);
         ProxyContext.getInstance().init(contextManager);
-        initRuleAlteredJobWorker(modeConfig, contextManager);
+        contextManagerInitializedCallback(modeConfig, contextManager);
         setDatabaseServerInfo();
     }
     
@@ -79,19 +81,18 @@ public final class BootstrapInitializer {
         return ContextManagerBuilderFactory.newInstance(modeConfig).build(parameter);
     }
     
-    private void initRuleAlteredJobWorker(final ModeConfiguration modeConfig, final ContextManager contextManager) {
-        if (null == modeConfig) {
-            return;
-        }
-        // TODO decouple "Cluster" to pluggable
-        if (!"Cluster".equals(modeConfig.getType())) {
-            log.info("mode type is not Cluster, ignore initRuleAlteredJobWorker");
-            return;
+    private void contextManagerInitializedCallback(final ModeConfiguration modeConfig, final ContextManager contextManager) {
+        Map<String, ContextManagerLifecycleListener> listenerMap = SingletonSPIRegistry.getTypedSingletonInstancesMap(ContextManagerLifecycleListener.class);
+        log.info("listenerMap.keySet={}", listenerMap.keySet());
+        for (Entry<String, ContextManagerLifecycleListener> entry : listenerMap.entrySet()) {
+            try {
+                entry.getValue().onInitialized(modeConfig, contextManager);
+                // CHECKSTYLE:OFF
+            } catch (final Exception ex) {
+                // CHECKSTYLE:ON
+                log.error("contextManager onInitialized callback for '{}' failed", entry.getKey(), ex);
+            }
         }
-        PipelineContext.initModeConfig(modeConfig);
-        PipelineContext.initContextManager(contextManager);
-        // TODO init worker only if necessary, e.g. 1) rule altered action configured, 2) enabled job exists, 3) stopped job restarted
-        RuleAlteredJobWorker.initWorkerIfNecessary();
     }
     
     private void setDatabaseServerInfo() {