You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@shardingsphere.apache.org by du...@apache.org on 2022/04/28 06:35:20 UTC
[shardingsphere] branch master updated: Sync all compute node when alter schema and fix NPE (#17165)
This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 0b8836b8d1e Sync all compute node when alter schema and fix NPE (#17165)
0b8836b8d1e is described below
commit 0b8836b8d1e3163e1728f1095ee561813fa8968f
Author: zhaojinchao <zh...@apache.org>
AuthorDate: Thu Apr 28 14:35:12 2022 +0800
Sync all compute node when alter schema and fix NPE (#17165)
* Frist commit
* Update code
* Fix NPE
* Fix checkstyle
* remove reduce code
* Add final
---
.../mode/manager/ContextManager.java | 38 ++++++++++++++++++-
.../persist/node/DatabaseMetaDataNode.java | 20 ++++++++--
.../persist/node/DatabaseMetaDataNodeTest.java | 10 ++---
.../ClusterContextManagerCoordinator.java | 22 +++++++++++
.../registry/metadata/event/SchemaAddedEvent.java | 34 +++++++++++++++++
.../metadata/event/SchemaDeletedEvent.java | 34 +++++++++++++++++
.../SchemaMetaDataRegistrySubscriber.java | 2 +-
.../metadata/watcher/MetaDataChangedWatcher.java | 43 ++++++++++++++++------
8 files changed, 180 insertions(+), 23 deletions(-)
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
index a6c0fbe70fc..97079501315 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/ContextManager.java
@@ -127,7 +127,7 @@ public final class ContextManager implements AutoCloseable {
}
/**
- * Add schema.
+ * Add database.
*
* @param databaseName database name
* @throws SQLException SQL exception
@@ -145,6 +145,22 @@ public final class ContextManager implements AutoCloseable {
renewAllTransactionContext();
}
+ /**
+ * Add schema.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ */
+ public void addSchema(final String databaseName, final String schemaName) {
+ if (null != metaDataContexts.getMetaData(databaseName).getSchemaByName(schemaName)) {
+ return;
+ }
+ FederationDatabaseMetaData databaseMetaData = metaDataContexts.getOptimizerContext().getFederationMetaData().getDatabases().get(databaseName);
+ databaseMetaData.put(schemaName, new TableMetaData());
+ metaDataContexts.getOptimizerContext().getPlannerContexts().put(databaseName, OptimizerPlannerContextFactory.create(databaseMetaData));
+ metaDataContexts.getMetaDataMap().get(databaseName).getSchemas().put(schemaName, new ShardingSphereSchema());
+ }
+
/**
* Alter database.
*
@@ -172,6 +188,9 @@ public final class ContextManager implements AutoCloseable {
*/
public void alterDatabase(final String databaseName, final String schemaName, final TableMetaData changedTableMetaData, final String deletedTable) {
Optional.ofNullable(changedTableMetaData).ifPresent(optional -> alterTableSchema(databaseName, schemaName, optional));
+ if (null == metaDataContexts.getMetaData(databaseName).getSchemaByName(schemaName)) {
+ return;
+ }
Optional.ofNullable(deletedTable).ifPresent(optional -> deleteTable(databaseName, schemaName, optional));
}
@@ -210,7 +229,7 @@ public final class ContextManager implements AutoCloseable {
}
/**
- * Delete data base.
+ * Delete database.
*
* @param databaseName database name
*/
@@ -226,6 +245,21 @@ public final class ContextManager implements AutoCloseable {
}
}
+ /**
+ * Drop schema.
+ *
+ * @param databaseName database name
+ * @param schemaName schema name
+ */
+ public void dropSchema(final String databaseName, final String schemaName) {
+ if (null == metaDataContexts.getMetaData(databaseName).getSchemaByName(schemaName)) {
+ return;
+ }
+ FederationDatabaseMetaData databaseMetaData = metaDataContexts.getOptimizerContext().getFederationMetaData().getDatabases().get(databaseName);
+ databaseMetaData.remove(schemaName);
+ metaDataContexts.getMetaDataMap().get(databaseName).getSchemas().remove(schemaName);
+ }
+
/**
* Add resource.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNode.java b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNode.java
index 500f8c9a20a..4707af868cc 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNode.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/main/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNode.java
@@ -141,7 +141,7 @@ public final class DatabaseMetaDataNode {
* @return database name
*/
public static Optional<String> getDatabaseName(final String configNodeFullPath) {
- Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)" + "(/datasources|/rules|/tables)?", Pattern.CASE_INSENSITIVE);
+ Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(configNodeFullPath);
return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
}
@@ -153,9 +153,9 @@ public final class DatabaseMetaDataNode {
* @return schema name
*/
public static Optional<String> getSchemaName(final String configNodeFullPath) {
- Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)/([\\w\\-]+)/([\\w\\-]+)(/tables)?", Pattern.CASE_INSENSITIVE);
+ Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(configNodeFullPath);
- return matcher.find() ? Optional.of(matcher.group(3)) : Optional.empty();
+ return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty();
}
/**
@@ -165,11 +165,23 @@ public final class DatabaseMetaDataNode {
* @return database name
*/
public static Optional<String> getDatabaseNameByDatabasePath(final String databasePath) {
- Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)$", Pattern.CASE_INSENSITIVE);
+ Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
Matcher matcher = pattern.matcher(databasePath);
return matcher.find() ? Optional.of(matcher.group(1)) : Optional.empty();
}
+ /**
+ * Get schema name.
+ *
+ * @param schemaPath schema path
+ * @return schema name
+ */
+ public static Optional<String> getSchemaNameBySchemaPath(final String schemaPath) {
+ Pattern pattern = Pattern.compile(getMetaDataNodePath() + "/([\\w\\-]+)/schemas/([\\w\\-]+)?", Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(schemaPath);
+ return matcher.find() ? Optional.of(matcher.group(2)) : Optional.empty();
+ }
+
/**
* Get table meta data path.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNodeTest.java b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNodeTest.java
index 946da130553..d965134955a 100644
--- a/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNodeTest.java
+++ b/shardingsphere-mode/shardingsphere-mode-core/src/test/java/org/apache/shardingsphere/mode/metadata/persist/node/DatabaseMetaDataNodeTest.java
@@ -35,14 +35,14 @@ public class DatabaseMetaDataNodeTest {
@Test
public void assertGetDatabaseName() {
- Optional<String> actualSchemaName = DatabaseMetaDataNode.getDatabaseName("/metadata/logic_db/logic_schema/rules");
+ Optional<String> actualSchemaName = DatabaseMetaDataNode.getDatabaseName("/metadata/logic_db");
assertTrue(actualSchemaName.isPresent());
assertThat(actualSchemaName.get(), is("logic_db"));
}
@Test
public void assertGetDatabaseNameWithLine() {
- Optional<String> actualSchemaName = DatabaseMetaDataNode.getDatabaseName("/metadata/logic-db-test/logic-db-schema/rules");
+ Optional<String> actualSchemaName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath("/metadata/logic-db-test/schemas/logic-db-schema");
assertTrue(actualSchemaName.isPresent());
assertThat(actualSchemaName.get(), is("logic-db-test"));
}
@@ -59,7 +59,7 @@ public class DatabaseMetaDataNodeTest {
@Test
public void assertGetDatabaseNameByDatabasePath() {
- Optional<String> actualSchemaName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath("/metadata/logic_db");
+ Optional<String> actualSchemaName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath("/metadata/logic_db/schemas/logic_schema");
assertTrue(actualSchemaName.isPresent());
assertThat(actualSchemaName.get(), is("logic_db"));
}
@@ -73,13 +73,13 @@ public class DatabaseMetaDataNodeTest {
@Test
public void assertGetSchemaName() {
- Optional<String> actualSchemaName = DatabaseMetaDataNode.getSchemaName("/metadata/logic_db/schemas/logic_schema/tables/t_order");
+ Optional<String> actualSchemaName = DatabaseMetaDataNode.getSchemaName("/metadata/logic_db/schemas/logic_schema");
assertTrue(actualSchemaName.isPresent());
assertThat(actualSchemaName.get(), is("logic_schema"));
}
@Test
- public void assertGetVersionBySchemaPath() {
+ public void assertGetVersionByDatabasePath() {
Optional<String> actualVersion = DatabaseMetaDataNode.getVersionByDataSourcesPath("/metadata/logic_db/versions/0/dataSources");
assertTrue(actualVersion.isPresent());
assertThat(actualVersion.get(), is("0"));
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
index dcf2f0b5f9f..c62a1b22d4b 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/ClusterContextManagerCoordinator.java
@@ -42,6 +42,8 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.ShowProcessListManager;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.lock.ShowProcessListSimpleLock;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.process.node.ProcessNode;
@@ -106,6 +108,26 @@ public final class ClusterContextManagerCoordinator {
contextManager.deleteDatabase(event.getDatabaseName());
}
+ /**
+ * Renew to added schema.
+ *
+ * @param event schema added event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaAddedEvent event) {
+ contextManager.addSchema(event.getDatabaseName(), event.getSchemaName());
+ }
+
+ /**
+ * Renew to delete schema.
+ *
+ * @param event schema delete event
+ */
+ @Subscribe
+ public synchronized void renew(final SchemaDeletedEvent event) {
+ contextManager.dropSchema(event.getDatabaseName(), event.getSchemaName());
+ }
+
/**
* Renew properties.
*
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaAddedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaAddedEvent.java
new file mode 100644
index 00000000000..77b880cb5ef
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaAddedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Schema added event.
+ */
+@RequiredArgsConstructor
+@Getter
+public final class SchemaAddedEvent implements GovernanceEvent {
+
+ private final String databaseName;
+
+ private final String schemaName;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
new file mode 100644
index 00000000000..70ae4d421e7
--- /dev/null
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/event/SchemaDeletedEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.GovernanceEvent;
+
+/**
+ * Schema deleted event.
+ */
+@RequiredArgsConstructor
+@Getter
+public class SchemaDeletedEvent implements GovernanceEvent {
+
+ private final String databaseName;
+
+ private final String schemaName;
+}
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriber.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySu [...]
index 1ae74ad8deb..976d52b3e5c 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriber.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/subscriber/SchemaMetaDataRegistrySubscriber.java
@@ -66,8 +66,8 @@ public final class SchemaMetaDataRegistrySubscriber {
*/
@Subscribe
public void alterSchema(final AlterSchemaEvent event) {
- persistService.deleteSchema(event.getDatabaseName(), event.getSchemaName());
persistService.persistTables(event.getDatabaseName(), event.getRenameSchemaName(), event.getSchema());
+ persistService.deleteSchema(event.getDatabaseName(), event.getSchemaName());
}
/**
diff --git a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
index bdccfd57d86..922f25f8ad6 100644
--- a/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
+++ b/shardingsphere-mode/shardingsphere-mode-type/shardingsphere-cluster-mode/shardingsphere-cluster-mode-core/src/main/java/org/apache/shardingsphere/mode/manager/cluster/coordinator/registry/metadata/watcher/MetaDataChangedWatcher.java
@@ -35,6 +35,8 @@ import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.confi
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.config.event.version.SchemaVersionChangedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseAddedEvent;
import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.DatabaseDeletedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaAddedEvent;
+import org.apache.shardingsphere.mode.manager.cluster.coordinator.registry.metadata.event.SchemaDeletedEvent;
import org.apache.shardingsphere.mode.metadata.persist.node.DatabaseMetaDataNode;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent;
import org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEvent.Type;
@@ -67,41 +69,60 @@ public final class MetaDataChangedWatcher implements GovernanceWatcher<Governanc
@Override
public Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
+ // TODO Maybe can reduce once regular
if (isLogicDatabaseChanged(event)) {
return buildLogicDatabaseChangedEvent(event);
+ } else if (isLogicSchemaChanged(event)) {
+ return buildLogicSchemaChangedEvent(event);
} else if (isTableMetaDataChanged(event)) {
return buildTableMetaDataChangedEvent(event);
- } else if (DataChangedEvent.Type.UPDATED == event.getType()) {
+ } else if (Type.UPDATED == event.getType()) {
return buildGovernanceEvent(event);
}
return Optional.empty();
}
private boolean isLogicDatabaseChanged(final DataChangedEvent event) {
- return DatabaseMetaDataNode.getDatabaseNameByDatabasePath(event.getKey()).isPresent();
+ return DatabaseMetaDataNode.getDatabaseName(event.getKey()).isPresent();
+ }
+
+ private boolean isLogicSchemaChanged(final DataChangedEvent event) {
+ return DatabaseMetaDataNode.getSchemaName(event.getKey()).isPresent();
}
private boolean isTableMetaDataChanged(final DataChangedEvent event) {
- Optional<String> databaseName = DatabaseMetaDataNode.getDatabaseName(event.getKey());
- Optional<String> schemaName = DatabaseMetaDataNode.getSchemaName(event.getKey());
+ Optional<String> databaseName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath(event.getKey());
+ Optional<String> schemaName = DatabaseMetaDataNode.getSchemaNameBySchemaPath(event.getKey());
Optional<String> tableName = DatabaseMetaDataNode.getTableName(event.getKey());
return databaseName.isPresent() && tableName.isPresent() && schemaName.isPresent()
&& !SystemSchemaBuilderRule.isSystemTable(databaseName.get(), tableName.get()) && !Strings.isNullOrEmpty(event.getValue());
}
private Optional<GovernanceEvent> buildLogicDatabaseChangedEvent(final DataChangedEvent event) {
- String databaseName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath(event.getKey()).get();
- if (DataChangedEvent.Type.ADDED == event.getType() || DataChangedEvent.Type.UPDATED == event.getType()) {
+ String databaseName = DatabaseMetaDataNode.getDatabaseName(event.getKey()).get();
+ if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
return Optional.of(new DatabaseAddedEvent(databaseName));
}
- if (DataChangedEvent.Type.DELETED == event.getType()) {
+ if (Type.DELETED == event.getType()) {
return Optional.of(new DatabaseDeletedEvent(databaseName));
}
return Optional.empty();
}
+ private Optional<GovernanceEvent> buildLogicSchemaChangedEvent(final DataChangedEvent event) {
+ String databaseName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath(event.getKey()).get();
+ String schemaName = DatabaseMetaDataNode.getSchemaName(event.getKey()).get();
+ if (Type.ADDED == event.getType() || Type.UPDATED == event.getType()) {
+ return Optional.of(new SchemaAddedEvent(databaseName, schemaName));
+ }
+ if (Type.DELETED == event.getType()) {
+ return Optional.of(new SchemaDeletedEvent(databaseName, schemaName));
+ }
+ return Optional.empty();
+ }
+
private Optional<GovernanceEvent> buildGovernanceEvent(final DataChangedEvent event) {
- Optional<String> databaseName = DatabaseMetaDataNode.getDatabaseName(event.getKey());
+ Optional<String> databaseName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath(event.getKey());
if (!databaseName.isPresent() || Strings.isNullOrEmpty(event.getValue())) {
return Optional.empty();
}
@@ -142,10 +163,10 @@ public final class MetaDataChangedWatcher implements GovernanceWatcher<Governanc
}
private Optional<GovernanceEvent> buildTableMetaDataChangedEvent(final DataChangedEvent event) {
- String databaseName = DatabaseMetaDataNode.getDatabaseName(event.getKey()).get();
- String schemaName = DatabaseMetaDataNode.getSchemaName(event.getKey()).get();
+ String databaseName = DatabaseMetaDataNode.getDatabaseNameByDatabasePath(event.getKey()).get();
+ String schemaName = DatabaseMetaDataNode.getSchemaNameBySchemaPath(event.getKey()).get();
String tableName = DatabaseMetaDataNode.getTableName(event.getKey()).get();
- if (DataChangedEvent.Type.DELETED == event.getType()) {
+ if (Type.DELETED == event.getType()) {
return Optional.of(new SchemaChangedEvent(databaseName, schemaName, null, tableName));
}
return Optional.of(new SchemaChangedEvent(databaseName, schemaName, new TableMetaDataYamlSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(), YamlTableMetaData.class)), null));