You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2019/01/20 08:41:11 UTC
[incubator-skywalking] branch master updated: Fixed the inventory
register lock invalid bug. (#2184)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 33f5cc1 Fixed the inventory register lock invalid bug. (#2184)
33f5cc1 is described below
commit 33f5cc19b54d0988304a3ed9e4477cdc3314cd6d
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Sun Jan 20 16:41:06 2019 +0800
Fixed the inventory register lock invalid bug. (#2184)
* #2183
Merge commit into master.
* Make the h2 register lock same as mysql.
---
.../core/register/NetworkAddressInventory.java | 12 ++-
.../oap/server/core/register/RegisterSource.java | 5 +-
.../oap/server/core/register/ServiceInventory.java | 5 +-
.../register/worker/RegisterDistinctWorker.java | 9 +--
.../register/worker/RegisterPersistentWorker.java | 44 +++++------
.../oap/server/core/storage/IRegisterDAO.java | 11 +--
.../oap/server/core/storage/IRegisterLockDAO.java | 3 +-
.../standardization/ReferenceIdExchanger.java | 19 +++--
.../StorageModuleElasticsearchProvider.java | 2 +-
.../plugin/elasticsearch/base/RegisterEsDAO.java | 42 ++---------
.../elasticsearch/lock/RegisterLockDAOImpl.java | 27 ++++---
.../elasticsearch/lock/RegisterLockIndex.java | 1 +
.../elasticsearch/lock/RegisterLockInstaller.java | 10 ++-
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 9 ++-
.../storage/plugin/jdbc/h2/dao/H2RegisterDAO.java | 44 ++---------
.../plugin/jdbc/h2/dao/H2RegisterLockDAO.java | 60 +++++++++++++--
.../dao/H2RegisterLockInstaller.java} | 35 ++++-----
.../jdbc/mysql/MySQLRegisterTableLockDAO.java | 85 ----------------------
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 51 +++----------
19 files changed, 184 insertions(+), 290 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
index 3fc26cd..4572e3d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
@@ -93,10 +93,16 @@ public class NetworkAddressInventory extends RegisterSource {
return inventory;
}
- @Override public void combine(RegisterSource registerSource) {
- super.combine(registerSource);
+ @Override public boolean combine(RegisterSource registerSource) {
+ boolean isCombine = super.combine(registerSource);
NetworkAddressInventory inventory = (NetworkAddressInventory)registerSource;
- setNodeType(inventory.nodeType);
+
+ if (nodeType != inventory.nodeType) {
+ setNodeType(inventory.nodeType);
+ return true;
+ } else {
+ return isCombine;
+ }
}
@Override public RemoteData.Builder serialize() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
index 7a497c1..210c170 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
@@ -36,9 +36,12 @@ public abstract class RegisterSource extends StreamData implements StorageData {
@Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime;
@Getter @Setter @Column(columnName = HEARTBEAT_TIME) private long heartbeatTime;
- public void combine(RegisterSource registerSource) {
+ public boolean combine(RegisterSource registerSource) {
if (heartbeatTime < registerSource.getHeartbeatTime()) {
heartbeatTime = registerSource.getHeartbeatTime();
+ return true;
+ } else {
+ return false;
}
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
index 9aaa37b..72fae39 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
@@ -181,15 +181,18 @@ public class ServiceInventory extends RegisterSource {
return 0;
}
- @Override public void combine(RegisterSource registerSource) {
+ @Override public boolean combine(RegisterSource registerSource) {
super.combine(registerSource);
ServiceInventory serviceInventory = (ServiceInventory)registerSource;
+
nodeType = serviceInventory.nodeType;
setProp(serviceInventory.getProp());
if (Const.NONE != serviceInventory.getMappingServiceId() && serviceInventory.getMappingLastUpdateTime() >= this.getMappingLastUpdateTime()) {
this.mappingServiceId = serviceInventory.getMappingServiceId();
this.mappingLastUpdateTime = serviceInventory.getMappingLastUpdateTime();
}
+
+ return true;
}
public static class Builder implements StorageBuilder<ServiceInventory> {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
index c02f150..443eb73 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterDistinctWorker.java
@@ -22,7 +22,7 @@ import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
-import org.apache.skywalking.oap.server.core.register.*;
+import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.slf4j.*;
@@ -43,7 +43,7 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
this.nextWorker = nextWorker;
this.sources = new HashMap<>();
this.dataCarrier = new DataCarrier<>(1, 10000);
- this.dataCarrier.consume(new AggregatorConsumer(this), 1);
+ this.dataCarrier.consume(new AggregatorConsumer(this), 1, 200);
}
@Override public final void in(RegisterSource source) {
@@ -61,9 +61,8 @@ public class RegisterDistinctWorker extends AbstractWorker<RegisterSource> {
}
if (messageNum >= 1000 || source.getEndOfBatchContext().isEndOfBatch()) {
- sources.values().forEach(source1 -> {
- nextWorker.in(source1);
- });
+ sources.values().forEach(nextWorker::in);
+ sources.clear();
messageNum = 0;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
index 61b287c..613619b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/worker/RegisterPersistentWorker.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.register.worker;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.source.Scope;
@@ -52,7 +53,7 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
this.registerLockDAO = moduleManager.find(StorageModule.NAME).provider().getService(IRegisterLockDAO.class);
this.scope = scope;
this.dataCarrier = new DataCarrier<>("IndicatorPersistentWorker." + modelName, 1, 10000);
- this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1);
+ this.dataCarrier.consume(new RegisterPersistentWorker.PersistentConsumer(this), 1, 200);
}
@Override public final void in(RegisterSource registerSource) {
@@ -67,31 +68,30 @@ public class RegisterPersistentWorker extends AbstractWorker<RegisterSource> {
sources.get(registerSource).combine(registerSource);
}
- if (registerSource.getEndOfBatchContext().isEndOfBatch()) {
-
- if (registerLockDAO.tryLock(scope)) {
- try {
- sources.values().forEach(source -> {
- try {
- RegisterSource dbSource = registerDAO.get(modelName, source.id());
- if (Objects.nonNull(dbSource)) {
- dbSource.combine(source);
+ if (sources.size() > 1000 || registerSource.getEndOfBatchContext().isEndOfBatch()) {
+ sources.values().forEach(source -> {
+ int sequence;
+ if ((sequence = registerLockDAO.tryLockAndIncrement(scope)) != Const.NONE) {
+ try {
+ RegisterSource dbSource = registerDAO.get(modelName, source.id());
+ if (Objects.nonNull(dbSource)) {
+ if (dbSource.combine(source)) {
registerDAO.forceUpdate(modelName, dbSource);
- } else {
- int sequence = registerDAO.registerId(modelName, source);
- source.setSequence(sequence);
- registerDAO.forceInsert(modelName, source);
}
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
+ } else {
+ source.setSequence(sequence);
+ registerDAO.forceInsert(modelName, source);
}
- });
- } finally {
- registerLockDAO.releaseLock(scope);
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ } finally {
+ registerLockDAO.releaseLock(scope);
+ }
+ } else {
+ logger.info("{} inventory register try lock and increment sequence failure.", scope.name());
}
- } else {
- logger.info("Inventory register try lock failure.");
- }
+ });
+ sources.clear();
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
index d42e920..e4e1380 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterDAO.java
@@ -25,16 +25,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource;
* @author peng-yongsheng
*/
public interface IRegisterDAO extends DAO {
-
- /**
- * According modelName and register source, try to get the unique ID for this particular model.
- * @param modelName
- * @param registerSource
- * @return the unique id. This ID for each model should start with 2. 1 has been reserved.
- * @throws IOException
- */
- int registerId(String modelName, RegisterSource registerSource) throws IOException;
-
+
RegisterSource get(String modelName, String id) throws IOException;
void forceInsert(String modelName, RegisterSource source) throws IOException;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
index e64b153..809dd86 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRegisterLockDAO.java
@@ -24,7 +24,8 @@ import org.apache.skywalking.oap.server.core.source.Scope;
* @author peng-yongsheng
*/
public interface IRegisterLockDAO extends DAO {
- boolean tryLock(Scope scope);
+
+ int tryLockAndIncrement(Scope scope);
void releaseLock(Scope scope);
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
index 4ea33a8..decbdde 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/ReferenceIdExchanger.java
@@ -55,10 +55,10 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
@Override public boolean exchange(ReferenceDecorator standardBuilder, int serviceId) {
if (standardBuilder.getEntryEndpointId() == 0) {
String entryEndpointName = Strings.isNullOrEmpty(standardBuilder.getEntryEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder.getEntryEndpointName();
- int entryEndpointId = getEndpointId(standardBuilder, entryEndpointName);
+ int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId();
+ int entryEndpointId = getEndpointId(entryServiceId, entryEndpointName);
if (entryEndpointId == 0) {
if (logger.isDebugEnabled()) {
- int entryServiceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId();
logger.debug("entry endpoint name: {} from service id: {} exchange failed", entryEndpointName, entryServiceId);
}
return false;
@@ -71,11 +71,11 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
if (standardBuilder.getParentEndpointId() == 0) {
String parentEndpointName = Strings.isNullOrEmpty(standardBuilder.getParentEndpointName()) ? Const.DOMAIN_OPERATION_NAME : standardBuilder.getParentEndpointName();
- int parentEndpointId = getEndpointId(standardBuilder, parentEndpointName);
-
+ int parentServiceId = serviceInstanceInventoryCache.get(standardBuilder.getParentServiceInstanceId()).getServiceId();
+ int parentEndpointId = getEndpointId(parentServiceId, parentEndpointName);
+
if (parentEndpointId == 0) {
if (logger.isDebugEnabled()) {
- int parentServiceId = serviceInstanceInventoryCache.get(standardBuilder.getParentServiceInstanceId()).getServiceId();
logger.debug("parent endpoint name: {} from service id: {} exchange failed", parentEndpointName, parentServiceId);
}
return false;
@@ -102,7 +102,7 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
}
return true;
}
-
+
/**
* Endpoint in ref could be local or exit span's operation name.
* Especially if it is local span operation name,
@@ -111,13 +111,12 @@ public class ReferenceIdExchanger implements IdExchanger<ReferenceDecorator> {
* Need to try to get the id by assuming the endpoint name is detected at server, local or client.
*
* If agent does the exchange, then always use endpoint id.
- *
- * @param standardBuilder
+ *
+ * @param serviceId
* @param endpointName
* @return
*/
- private int getEndpointId(ReferenceDecorator standardBuilder,String endpointName) {
- int serviceId = serviceInstanceInventoryCache.get(standardBuilder.getEntryServiceInstanceId()).getServiceId();
+ private int getEndpointId(int serviceId, String endpointName) {
int endpointId = endpointInventoryRegister.get(serviceId, endpointName, DetectPoint.SERVER.ordinal());
if (endpointId == Const.NONE) {
endpointId = endpointInventoryRegister.get(serviceId, endpointName, DetectPoint.CLIENT.ordinal());
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index afec137..ba133d1 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -66,7 +66,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new BatchProcessEsDAO(elasticSearchClient, config.getBulkActions(), config.getBulkSize(), config.getFlushInterval(), config.getConcurrentRequests()));
this.registerServiceImplementation(StorageDAO.class, new StorageEsDAO(elasticSearchClient));
- this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient, 1000));
+ this.registerServiceImplementation(IRegisterLockDAO.class, new RegisterLockDAOImpl(elasticSearchClient, 10 * 60 * 1000));
this.registerServiceImplementation(IHistoryDeleteDAO.class, new HistoryDeleteEsDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new ServiceInventoryCacheEsDAO(elasticSearchClient));
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
index 11d423a..58f3a8d 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RegisterEsDAO.java
@@ -24,11 +24,7 @@ import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.xcontent.*;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.metrics.max.Max;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.*;
/**
@@ -55,18 +51,16 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
}
@Override public void forceInsert(String modelName, RegisterSource source) throws IOException {
- Map<String, Object> objectMap = storageBuilder.data2Map(source);
-
- XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
- for (String key : objectMap.keySet()) {
- builder.field(key, objectMap.get(key));
- }
- builder.endObject();
-
+ XContentBuilder builder = build(source);
getClient().forceInsert(modelName, source.id(), builder);
}
@Override public void forceUpdate(String modelName, RegisterSource source) throws IOException {
+ XContentBuilder builder = build(source);
+ getClient().forceUpdate(modelName, source.id(), builder);
+ }
+
+ private XContentBuilder build(RegisterSource source) throws IOException {
Map<String, Object> objectMap = storageBuilder.data2Map(source);
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
@@ -74,27 +68,7 @@ public class RegisterEsDAO extends EsDAO implements IRegisterDAO {
builder.field(key, objectMap.get(key));
}
builder.endObject();
-
- getClient().forceUpdate(modelName, source.id(), builder);
- }
-
- @Override public int registerId(String modelName,
- RegisterSource registerSource) throws IOException {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.aggregation(AggregationBuilders.max(RegisterSource.SEQUENCE).field(RegisterSource.SEQUENCE));
- searchSourceBuilder.size(0);
- return getResponse(modelName, searchSourceBuilder);
- }
-
- private int getResponse(String modelName, SearchSourceBuilder searchSourceBuilder) throws IOException {
- SearchResponse searchResponse = getClient().search(modelName, searchSourceBuilder);
- Max agg = searchResponse.getAggregations().get(RegisterSource.SEQUENCE);
-
- int id = (int)agg.getValue();
- if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
- return 2;
- } else {
- return id + 1;
- }
+
+ return builder;
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
index d76c650..8b94563 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockDAOImpl.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.lock;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -43,37 +44,41 @@ public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
this.timeout = timeout;
}
- @Override public boolean tryLock(Scope scope) {
+ @Override public int tryLockAndIncrement(Scope scope) {
String id = String.valueOf(scope.ordinal());
+
+ int sequence = Const.NONE;
try {
GetResponse response = getClient().get(RegisterLockIndex.NAME, id);
if (response.isExists()) {
Map<String, Object> source = response.getSource();
- long expire = (long)source.get(RegisterLockIndex.COLUMN_EXPIRE);
+ long expire = ((Number)source.get(RegisterLockIndex.COLUMN_EXPIRE)).longValue();
boolean lockable = (boolean)source.get(RegisterLockIndex.COLUMN_LOCKABLE);
+ sequence = ((Number)source.get(RegisterLockIndex.COLUMN_SEQUENCE)).intValue();
long version = response.getVersion();
- if (lockable) {
- lock(id, timeout, version);
- } else if (System.currentTimeMillis() > expire) {
- lock(id, timeout, version);
+ sequence++;
+
+ if (lockable || System.currentTimeMillis() > expire) {
+ lock(id, sequence, timeout, version);
} else {
TimeUnit.SECONDS.sleep(1);
- return false;
+ return Const.NONE;
}
}
} catch (Throwable t) {
logger.warn("Try to lock the row with the id {} failure, error message: {}", id, t.getMessage());
- return false;
+ return Const.NONE;
}
- return true;
+ return sequence;
}
- private void lock(String id, int timeout, long version) throws IOException {
+ private void lock(String id, int sequence, int timeout, long version) throws IOException {
XContentBuilder source = XContentFactory.jsonBuilder().startObject();
source.field(RegisterLockIndex.COLUMN_EXPIRE, System.currentTimeMillis() + timeout);
source.field(RegisterLockIndex.COLUMN_LOCKABLE, false);
+ source.field(RegisterLockIndex.COLUMN_SEQUENCE, sequence);
source.endObject();
getClient().forceUpdate(RegisterLockIndex.NAME, id, source, version);
@@ -89,7 +94,7 @@ public class RegisterLockDAOImpl extends EsDAO implements IRegisterLockDAO {
getClient().forceUpdate(RegisterLockIndex.NAME, id, source);
} catch (Throwable t) {
- logger.error("Release lock failure.", t);
+ logger.error("{} inventory release lock failure.", scope.name(), t);
}
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
index ee19618..dc2ffa9 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockIndex.java
@@ -26,4 +26,5 @@ public class RegisterLockIndex {
public static final String NAME = "register_lock";
public static final String COLUMN_EXPIRE = "expire";
public static final String COLUMN_LOCKABLE = "lockable";
+ public static final String COLUMN_SEQUENCE = "sequence";
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
index e8891f1..53e6557 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/lock/RegisterLockInstaller.java
@@ -26,10 +26,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnn
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.elasticsearch.common.xcontent.*;
+import org.slf4j.*;
/**
* @author peng-yongsheng
@@ -74,6 +72,9 @@ public class RegisterLockInstaller {
.startObject(RegisterLockIndex.COLUMN_LOCKABLE)
.field("type", "boolean")
.endObject()
+ .startObject(RegisterLockIndex.COLUMN_SEQUENCE)
+ .field("type", "integer")
+ .endObject()
.endObject()
.endObject();
@@ -86,6 +87,7 @@ public class RegisterLockInstaller {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
builder.field(RegisterLockIndex.COLUMN_EXPIRE, Long.MIN_VALUE);
builder.field(RegisterLockIndex.COLUMN_LOCKABLE, true);
+ builder.field(RegisterLockIndex.COLUMN_SEQUENCE, 1);
builder.endObject();
client.forceInsert(RegisterLockIndex.NAME, String.valueOf(scopeId), builder);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index 8d0c670..a76fb49 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -34,7 +34,7 @@ import org.slf4j.*;
*
* If someone wants to implement SQL-style database as storage, please just refer the logic.
*
- * @author wusheng
+ * @author wusheng, peng-yongsheng
*/
public class H2StorageProvider extends ModuleProvider {
@@ -42,6 +42,7 @@ public class H2StorageProvider extends ModuleProvider {
private H2StorageConfig config;
private JDBCHikariCPClient h2Client;
+ private H2RegisterLockDAO lockDAO;
public H2StorageProvider() {
config = new H2StorageConfig();
@@ -69,7 +70,9 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(h2Client));
- this.registerServiceImplementation(IRegisterLockDAO.class, new H2RegisterLockDAO());
+
+ lockDAO = new H2RegisterLockDAO(h2Client);
+ this.registerServiceImplementation(IRegisterLockDAO.class, lockDAO);
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new H2ServiceInventoryCacheDAO(h2Client));
this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new H2ServiceInstanceInventoryCacheDAO(h2Client));
@@ -91,6 +94,8 @@ public class H2StorageProvider extends ModuleProvider {
H2TableInstaller installer = new H2TableInstaller(getManager());
installer.install(h2Client);
+
+ new H2RegisterLockInstaller().install(h2Client, lockDAO);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterDAO.java
index 3403fe6..9c24197 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterDAO.java
@@ -19,54 +19,28 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import org.apache.skywalking.oap.server.core.Const;
+import java.sql.*;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
-import org.apache.skywalking.oap.server.core.storage.IRegisterDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
* @author wusheng
*/
public class H2RegisterDAO extends H2SQLExecutor implements IRegisterDAO {
+
private static final Logger logger = LoggerFactory.getLogger(H2RegisterDAO.class);
private final JDBCHikariCPClient h2Client;
private final StorageBuilder<RegisterSource> storageBuilder;
- public H2RegisterDAO(JDBCHikariCPClient h2Client,
- StorageBuilder<RegisterSource> storageBuilder) {
+ public H2RegisterDAO(JDBCHikariCPClient h2Client, StorageBuilder<RegisterSource> storageBuilder) {
this.h2Client = h2Client;
this.storageBuilder = storageBuilder;
}
- @Override public int registerId(String modelName,
- RegisterSource registerSource) throws IOException {
- try (Connection connection = h2Client.getConnection()) {
- try (ResultSet rs = h2Client.executeQuery(connection, "SELECT max(sequence) max_id FROM " + modelName)) {
- while (rs.next()) {
- int maxId = rs.getInt("max_id");
- if (maxId == 0) {
- return 2;
- } else {
- return maxId + 1;
- }
- }
- }
- } catch (SQLException e) {
- throw new IOException(e.getMessage(), e);
- } catch (JDBCClientException e) {
- throw new IOException(e.getMessage(), e);
- }
- return Const.NONE;
- }
-
@Override public RegisterSource get(String modelName, String id) throws IOException {
return (RegisterSource)getByID(h2Client, modelName, id, storageBuilder);
}
@@ -74,9 +48,7 @@ public class H2RegisterDAO extends H2SQLExecutor implements IRegisterDAO {
@Override public void forceInsert(String modelName, RegisterSource source) throws IOException {
try (Connection connection = h2Client.getConnection()) {
getInsertExecutor(modelName, source, storageBuilder).invoke(connection);
- } catch (SQLException e) {
- throw new IOException(e.getMessage(), e);
- } catch (JDBCClientException e) {
+ } catch (SQLException | JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
@@ -84,9 +56,7 @@ public class H2RegisterDAO extends H2SQLExecutor implements IRegisterDAO {
@Override public void forceUpdate(String modelName, RegisterSource source) throws IOException {
try (Connection connection = h2Client.getConnection()) {
getUpdateExecutor(modelName, source, storageBuilder).invoke(connection);
- } catch (SQLException e) {
- throw new IOException(e.getMessage(), e);
- } catch (JDBCClientException e) {
+ } catch (SQLException | JDBCClientException e) {
throw new IOException(e.getMessage(), e);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
index d0d78ae..3d316d7 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockDAO.java
@@ -18,20 +18,70 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+import java.sql.*;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.slf4j.*;
/**
- * No need to create any lock table. In SQL based database, could use `select... for update` to avoid lock table.
+ * In MySQL, use a row lock of LOCK table.
*
- * @author wusheng
+ * @author wusheng, peng-yongsheng
*/
public class H2RegisterLockDAO implements IRegisterLockDAO {
- @Override public boolean tryLock(Scope scope) {
- return true;
+
+ private static final Logger logger = LoggerFactory.getLogger(H2RegisterLockDAO.class);
+
+ private JDBCHikariCPClient h2Client;
+ private Map<Scope, Connection> onLockingConnection;
+
+ public H2RegisterLockDAO(JDBCHikariCPClient h2Client) {
+ this.h2Client = h2Client;
+ onLockingConnection = new HashMap<>();
}
- @Override public void releaseLock(Scope scope) {
+ void init(Scope scope) {
+ if (!onLockingConnection.containsKey(scope)) {
+ onLockingConnection.put(scope, null);
+ }
+ }
+
+ @Override public int tryLockAndIncrement(Scope scope) {
+ if (onLockingConnection.containsKey(scope)) {
+ try {
+ Connection connection = h2Client.getTransactionConnection();
+ onLockingConnection.put(scope, connection);
+ ResultSet resultSet = h2Client.executeQuery(connection, "select sequence from " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " where id = " + scope.ordinal() + " for update");
+ while (resultSet.next()) {
+ int sequence = resultSet.getInt("sequence");
+ sequence++;
+ h2Client.execute(connection, "update " + H2RegisterLockInstaller.LOCK_TABLE_NAME + " set sequence = " + sequence + " where id = " + scope.ordinal());
+ return sequence;
+ }
+ } catch (JDBCClientException | SQLException e) {
+ logger.error("try inventory register lock for scope id={} name={} failure.", scope.ordinal(), scope.name());
+ logger.error("tryLock error", e);
+ return Const.NONE;
+ }
+ }
+ return Const.NONE;
+ }
+ @Override public void releaseLock(Scope scope) {
+ Connection connection = onLockingConnection.get(scope);
+ if (connection != null) {
+ try {
+ connection.commit();
+ connection.close();
+ } catch (SQLException e) {
+ logger.error("release lock failure.", e);
+ } finally {
+ onLockingConnection.put(scope, null);
+ }
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLRegisterLockInstaller.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockInstaller.java
similarity index 79%
rename from oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLRegisterLockInstaller.java
rename to oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockInstaller.java
index 8f96c94..2138630 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLRegisterLockInstaller.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2RegisterLockInstaller.java
@@ -16,12 +16,9 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
+import java.sql.*;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.StorageException;
@@ -30,27 +27,28 @@ import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
/**
- * @author wusheng
+ * @author wusheng, peng-yongsheng
*/
-public class MySQLRegisterLockInstaller {
- public static final String LOCK_TABLE_NAME = "register_lock";
+public class H2RegisterLockInstaller {
- private static final Logger logger = LoggerFactory.getLogger(MySQLRegisterLockInstaller.class);
+ private static final Logger logger = LoggerFactory.getLogger(H2RegisterLockInstaller.class);
+
+ static final String LOCK_TABLE_NAME = "register_lock";
/**
- * In MySQL lock storage, lock table created. The row lock is used in {@link MySQLRegisterTableLockDAO}
+ * In MySQL lock storage, lock table created. The row lock is used in {@link H2RegisterLockDAO}
*
* @param client
* @throws StorageException
*/
- public void install(Client client, MySQLRegisterTableLockDAO dao) throws StorageException {
+ public void install(Client client, H2RegisterLockDAO dao) throws StorageException {
JDBCHikariCPClient h2Client = (JDBCHikariCPClient)client;
SQLBuilder tableCreateSQL = new SQLBuilder("CREATE TABLE IF NOT EXISTS " + LOCK_TABLE_NAME + " (");
- tableCreateSQL.appendLine("id int PRIMARY KEY, ");
+ tableCreateSQL.appendLine("id int PRIMARY KEY, ");
+ tableCreateSQL.appendLine("sequence int, ");
tableCreateSQL.appendLine("name VARCHAR(100)");
tableCreateSQL.appendLine(")");
@@ -66,9 +64,7 @@ public class MySQLRegisterLockInstaller {
dao.init(sourceScope);
putIfAbsent(h2Client, connection, sourceScope.ordinal(), sourceScope.name());
}
- } catch (JDBCClientException e) {
- throw new StorageException(e.getMessage(), e);
- } catch (SQLException e) {
+ } catch (JDBCClientException | SQLException e) {
throw new StorageException(e.getMessage(), e);
}
}
@@ -84,9 +80,10 @@ public class MySQLRegisterLockInstaller {
throw new StorageException(e.getMessage(), e);
}
if (!existed) {
- try (PreparedStatement statement = connection.prepareStatement("insert into " + LOCK_TABLE_NAME + "(id, name) values (?, ?)")) {
+ try (PreparedStatement statement = connection.prepareStatement("insert into " + LOCK_TABLE_NAME + "(id, sequence, name) values (?, ?, ?)")) {
statement.setInt(1, scopeId);
- statement.setString(2, scopeName);
+ statement.setInt(2, 1);
+ statement.setString(3, scopeName);
statement.execute();
} catch (SQLException e) {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLRegisterTableLockDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLRegisterTableLockDAO.java
deleted file mode 100644
index 3382bc1..0000000
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLRegisterTableLockDAO.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.skywalking.oap.server.core.source.Scope;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
-import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * In MySQL, use a row lock of LOCK table.
- *
- * @author wusheng
- */
-public class MySQLRegisterTableLockDAO implements IRegisterLockDAO {
- private static final Logger logger = LoggerFactory.getLogger(MySQLRegisterTableLockDAO.class);
-
- private JDBCHikariCPClient h2Client;
- private Map<Scope, Connection> onLockingConnection;
-
- public MySQLRegisterTableLockDAO(JDBCHikariCPClient h2Client) {
- this.h2Client = h2Client;
- onLockingConnection = new HashMap<>();
- }
-
- void init(Scope scope) {
- if (!onLockingConnection.containsKey(scope)) {
- onLockingConnection.put(scope, null);
- }
- }
-
- @Override public boolean tryLock(Scope scope) {
- if (onLockingConnection.containsKey(scope)) {
- try {
- Connection connection = h2Client.getTransactionConnection();
- onLockingConnection.put(scope, connection);
- connection.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
- h2Client.execute(connection, "select * from " + MySQLRegisterLockInstaller.LOCK_TABLE_NAME + " where id = " + scope.ordinal() + " for update");
- return true;
- } catch (JDBCClientException | SQLException e) {
- logger.error("try inventory register lock for scope id={} name={} failure.", scope.ordinal(), scope.name());
- logger.error("tryLock error", e);
- return false;
- }
- }
- return false;
- }
-
- @Override public void releaseLock(Scope scope) {
- Connection connection = onLockingConnection.get(scope);
- if (connection != null) {
- try {
- connection.commit();
- connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
- connection.close();
- } catch (SQLException e) {
- logger.error("release lock failure.", e);
- } finally {
- onLockingConnection.put(scope, null);
- }
- }
- }
-}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index 0dc7f25..fda7d31 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -21,43 +21,15 @@ package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
import java.io.IOException;
import java.util.Properties;
import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
-import org.apache.skywalking.oap.server.core.storage.IRegisterLockDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageDAO;
-import org.apache.skywalking.oap.server.core.storage.StorageException;
-import org.apache.skywalking.oap.server.core.storage.StorageModule;
-import org.apache.skywalking.oap.server.core.storage.cache.IEndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.INetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.cache.IServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAggregationQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.IMetricQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
-import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.cache.*;
+import org.apache.skywalking.oap.server.core.storage.query.*;
import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
-import org.apache.skywalking.oap.server.library.module.ModuleConfig;
-import org.apache.skywalking.oap.server.library.module.ModuleDefine;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleStartException;
-import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
+import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetadataQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2MetricQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2NetworkAddressInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInstanceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.*;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.*;
+import org.slf4j.*;
/**
* MySQL storage provider should be secondary choice for production usage as SkyWalking storage solution. It enhanced
@@ -67,14 +39,15 @@ import org.slf4j.LoggerFactory;
* this storage implementation, we could also use this in MySQL-compatible projects, such as, Apache ShardingSphere,
* TiDB
*
- * @author wusheng
+ * @author wusheng, peng-yongsheng
*/
public class MySQLStorageProvider extends ModuleProvider {
+
private static final Logger logger = LoggerFactory.getLogger(H2StorageProvider.class);
private H2StorageConfig config;
private JDBCHikariCPClient mysqlClient;
- private MySQLRegisterTableLockDAO lockDAO;
+ private H2RegisterLockDAO lockDAO;
public MySQLStorageProvider() {
config = new H2StorageConfig();
@@ -104,7 +77,7 @@ public class MySQLStorageProvider extends ModuleProvider {
this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(mysqlClient));
- lockDAO = new MySQLRegisterTableLockDAO(mysqlClient);
+ lockDAO = new H2RegisterLockDAO(mysqlClient);
this.registerServiceImplementation(IRegisterLockDAO.class, lockDAO);
this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new H2ServiceInventoryCacheDAO(mysqlClient));
@@ -128,7 +101,7 @@ public class MySQLStorageProvider extends ModuleProvider {
MySQLTableInstaller installer = new MySQLTableInstaller(getManager());
installer.install(mysqlClient);
- new MySQLRegisterLockInstaller().install(mysqlClient, lockDAO);
+ new H2RegisterLockInstaller().install(mysqlClient, lockDAO);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}