You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/09/21 03:47:46 UTC
[incubator-skywalking] branch master updated: Implementation of
metadata query. (#1686)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 0576466 Implementation of metadata query. (#1686)
0576466 is described below
commit 057646603197de66f3eabf952b24b13184f6e915
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Fri Sep 21 11:47:40 2018 +0800
Implementation of metadata query. (#1686)
---
.../skywalking/oap/server/core/CoreModule.java | 1 +
.../oap/server/core/CoreModuleProvider.java | 1 +
.../oap/server/core/query/DurationUtils.java | 16 ++
.../server/core/query/MetadataQueryService.java | 22 ++-
.../server/core/register/EndpointInventory.java | 2 +-
.../core/register/NetworkAddressInventory.java | 12 ++
.../oap/server/core/register/RegisterSource.java | 4 +-
.../oap/server/core/register/ServiceInventory.java | 4 +-
.../service/INetworkAddressInventoryRegister.java | 2 +
.../service/NetworkAddressInventoryRegister.java | 22 ++-
.../oap/server/core/storage/StorageModule.java | 2 +-
.../core/storage/query/IMetadataQueryDAO.java | 11 ++
.../oap/query/graphql/resolver/MetadataQuery.java | 48 +++---
.../parser/standardization/SpanIdExchanger.java | 6 +-
.../StorageModuleElasticsearchProvider.java | 1 +
.../elasticsearch/query/MetadataQueryEsDAO.java | 168 +++++++++++++++++++++
16 files changed, 282 insertions(+), 40 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
index f921fc0..bf1f666 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModule.java
@@ -60,6 +60,7 @@ public class CoreModule extends ModuleDefine {
classes.add(TopologyQueryService.class);
classes.add(MetricQueryService.class);
classes.add(TraceQueryService.class);
+ classes.add(MetadataQueryService.class);
}
private void addServerInterface(List<Class> classes) {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index e567b9c..4dc4573 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -113,6 +113,7 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
+ this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
annotationScan.registerListener(storageAnnotationListener);
annotationScan.registerListener(streamAnnotationListener);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/DurationUtils.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/DurationUtils.java
index b79d82d..999bbc1 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/DurationUtils.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/DurationUtils.java
@@ -224,4 +224,20 @@ public enum DurationUtils {
return durations;
}
+
+ public long toTimestamp(Step step, String dateStr) throws ParseException {
+ switch (step) {
+ case MONTH:
+ return new SimpleDateFormat("yyyy-MM").parse(dateStr).getTime();
+ case DAY:
+ return new SimpleDateFormat("yyyy-MM-dd").parse(dateStr).getTime();
+ case HOUR:
+ return new SimpleDateFormat("yyyy-MM-dd HH").parse(dateStr).getTime();
+ case MINUTE:
+ return new SimpleDateFormat("yyyy-MM-dd HHmm").parse(dateStr).getTime();
+ case SECOND:
+ return new SimpleDateFormat("yyyy-MM-dd HHmmss").parse(dateStr).getTime();
+ }
+ throw new UnexpectedException("Unsupported step " + step.name());
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
index 7520b0b..8f5f8a5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/MetadataQueryService.java
@@ -18,7 +18,9 @@
package org.apache.skywalking.oap.server.core.query;
+import java.io.IOException;
import java.util.*;
+import org.apache.skywalking.apm.network.language.agent.SpanLayer;
import org.apache.skywalking.oap.server.core.query.entity.*;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
@@ -43,19 +45,25 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
return metadataQueryDAO;
}
- public ClusterBrief getGlobalBrief(final Step step, final long startTB, final long endTB) {
- return new ClusterBrief();
+ public ClusterBrief getGlobalBrief(final long startTimestamp, final long endTimestamp) throws IOException {
+ ClusterBrief clusterBrief = new ClusterBrief();
+ clusterBrief.setNumOfService(getMetadataQueryDAO().numOfService(startTimestamp, endTimestamp));
+ clusterBrief.setNumOfEndpoint(getMetadataQueryDAO().numOfEndpoint(startTimestamp, endTimestamp));
+ clusterBrief.setNumOfDatabase(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, SpanLayer.Database_VALUE));
+ clusterBrief.setNumOfCache(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, SpanLayer.Cache_VALUE));
+ clusterBrief.setNumOfMQ(getMetadataQueryDAO().numOfConjectural(startTimestamp, endTimestamp, SpanLayer.MQ_VALUE));
+ return clusterBrief;
}
- public List<Service> getAllServices(final Step step, final long startTB, final long endTB) {
- return Collections.emptyList();
+ public List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException {
+ return getMetadataQueryDAO().getAllServices(startTimestamp, endTimestamp);
}
- public List<Service> searchServices(final Step step, final long startTB, final long endTB, final String keyword) {
+ public List<Service> searchServices(final long startTimestamp, final long endTimestamp, final String keyword) {
return Collections.emptyList();
}
- public List<ServiceInstance> getServiceInstances(final Step step, final long startTB, final long endTB,
+ public List<ServiceInstance> getServiceInstances(final long startTimestamp, final long endTimestamp,
final String id) {
return Collections.emptyList();
}
@@ -64,7 +72,7 @@ public class MetadataQueryService implements org.apache.skywalking.oap.server.li
return Collections.emptyList();
}
- public Service searchService(final Step step, final long startTB, final long endTB, final String serviceCode) {
+ public Service searchService(final long startTimestamp, final long endTimestamp, final String serviceCode) {
return new Service();
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
index 5ba802f..e56435f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/EndpointInventory.java
@@ -40,7 +40,7 @@ public class EndpointInventory extends RegisterSource {
private static final String SERVICE_ID = "service_id";
private static final String NAME = "name";
- private static final String DETECT_POINT = "detect_point";
+ public static final String DETECT_POINT = "detect_point";
@Setter @Getter @Column(columnName = SERVICE_ID) private int serviceId;
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
index b765a13..801355e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/NetworkAddressInventory.java
@@ -39,8 +39,12 @@ public class NetworkAddressInventory extends RegisterSource {
public static final String MODEL_NAME = "network_address_inventory";
private static final String NAME = "name";
+ public static final String SRC_LAYER = "src_layer";
+ private static final String SERVER_TYPE = "server_type";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
+ @Setter @Getter @Column(columnName = SRC_LAYER) private int srcLayer;
+ @Setter @Getter @Column(columnName = SERVER_TYPE) private int serverType;
public static String buildId(String networkAddress) {
return networkAddress;
@@ -74,6 +78,8 @@ public class NetworkAddressInventory extends RegisterSource {
@Override public RemoteData.Builder serialize() {
RemoteData.Builder remoteBuilder = RemoteData.newBuilder();
remoteBuilder.setDataIntegers(0, getSequence());
+ remoteBuilder.setDataIntegers(1, getSrcLayer());
+ remoteBuilder.setDataIntegers(2, getServerType());
remoteBuilder.setDataLongs(0, getRegisterTime());
remoteBuilder.setDataLongs(1, getHeartbeatTime());
@@ -84,6 +90,8 @@ public class NetworkAddressInventory extends RegisterSource {
@Override public void deserialize(RemoteData remoteData) {
setSequence(remoteData.getDataIntegers(0));
+ setSrcLayer(remoteData.getDataIntegers(1));
+ setServerType(remoteData.getDataIntegers(2));
setRegisterTime(remoteData.getDataLongs(0));
setHeartbeatTime(remoteData.getDataLongs(1));
@@ -101,6 +109,8 @@ public class NetworkAddressInventory extends RegisterSource {
NetworkAddressInventory inventory = new NetworkAddressInventory();
inventory.setSequence((Integer)dbMap.get(SEQUENCE));
inventory.setName((String)dbMap.get(NAME));
+ inventory.setSrcLayer((Integer)dbMap.get(SRC_LAYER));
+ inventory.setServerType((Integer)dbMap.get(SERVER_TYPE));
inventory.setRegisterTime((Long)dbMap.get(REGISTER_TIME));
inventory.setHeartbeatTime((Long)dbMap.get(HEARTBEAT_TIME));
return inventory;
@@ -110,6 +120,8 @@ public class NetworkAddressInventory extends RegisterSource {
Map<String, Object> map = new HashMap<>();
map.put(SEQUENCE, storageData.getSequence());
map.put(NAME, storageData.getName());
+ map.put(SRC_LAYER, storageData.getSrcLayer());
+ map.put(SERVER_TYPE, storageData.getServerType());
map.put(REGISTER_TIME, storageData.getRegisterTime());
map.put(HEARTBEAT_TIME, storageData.getHeartbeatTime());
return map;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
index abb6c9e..c7b2166 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/RegisterSource.java
@@ -29,8 +29,8 @@ import org.apache.skywalking.oap.server.core.storage.annotation.Column;
public abstract class RegisterSource extends StreamData implements StorageData {
public static final String SEQUENCE = "sequence";
- protected static final String REGISTER_TIME = "register_time";
- protected static final String HEARTBEAT_TIME = "heartbeat_time";
+ public static final String REGISTER_TIME = "register_time";
+ public static final String HEARTBEAT_TIME = "heartbeat_time";
@Getter @Setter @Column(columnName = SEQUENCE) private int sequence;
@Getter @Setter @Column(columnName = REGISTER_TIME) private long registerTime;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
index e61adb3..978fb8b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/ServiceInventory.java
@@ -39,8 +39,8 @@ public class ServiceInventory extends RegisterSource {
public static final String MODEL_NAME = "service_inventory";
- private static final String NAME = "name";
- private static final String IS_ADDRESS = "is_address";
+ public static final String NAME = "name";
+ public static final String IS_ADDRESS = "is_address";
private static final String ADDRESS_ID = "address_id";
@Setter @Getter @Column(columnName = NAME, matchQuery = true) private String name = Const.EMPTY_STRING;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/INetworkAddressInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/INetworkAddressInventoryRegister.java
index 635d55f..eb3b375 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/INetworkAddressInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/INetworkAddressInventoryRegister.java
@@ -27,4 +27,6 @@ public interface INetworkAddressInventoryRegister extends Service {
int getOrCreate(String networkAddress);
int get(String networkAddress);
+
+ void update(int addressId, int srcLayer, int serverType);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
index ba7f107..d4ea429 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/register/service/NetworkAddressInventoryRegister.java
@@ -24,7 +24,7 @@ import org.apache.skywalking.oap.server.core.register.NetworkAddressInventory;
import org.apache.skywalking.oap.server.core.register.worker.InventoryProcess;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import static java.util.Objects.isNull;
+import static java.util.Objects.*;
/**
* @author peng-yongsheng
@@ -91,4 +91,24 @@ public class NetworkAddressInventoryRegister implements INetworkAddressInventory
@Override public int get(String networkAddress) {
return getNetworkAddressInventoryCache().getAddressId(networkAddress);
}
+
+ @Override public void update(int addressId, int srcLayer, int serverType) {
+ if (!this.compare(addressId, srcLayer, serverType)) {
+ NetworkAddressInventory newNetworkAddress = getNetworkAddressInventoryCache().get(addressId);
+ newNetworkAddress.setSrcLayer(srcLayer);
+ newNetworkAddress.setServerType(serverType);
+ newNetworkAddress.setHeartbeatTime(System.currentTimeMillis());
+
+ InventoryProcess.INSTANCE.in(newNetworkAddress);
+ }
+ }
+
+ private boolean compare(int addressId, int srcLayer, int serverType) {
+ NetworkAddressInventory networkAddress = getNetworkAddressInventoryCache().get(addressId);
+
+ if (nonNull(networkAddress)) {
+ return srcLayer == networkAddress.getSrcLayer() && serverType == networkAddress.getServerType();
+ }
+ return true;
+ }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index cd00d47..ccaa198 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -38,6 +38,6 @@ public class StorageModule extends ModuleDefine {
IBatchDAO.class, StorageDAO.class, IRegisterLockDAO.class,
IServiceInventoryCacheDAO.class, IServiceInstanceInventoryCacheDAO.class,
IEndpointInventoryCacheDAO.class, INetworkAddressInventoryCacheDAO.class,
- ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class};
+ ITopologyQueryDAO.class, IMetricQueryDAO.class, ITraceQueryDAO.class, IMetadataQueryDAO.class};
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
index 2101c90..74cb303 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
@@ -18,10 +18,21 @@
package org.apache.skywalking.oap.server.core.storage.query;
+import java.io.IOException;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.query.entity.Service;
import org.apache.skywalking.oap.server.core.storage.DAO;
/**
* @author peng-yongsheng
*/
public interface IMetadataQueryDAO extends DAO {
+
+ int numOfService(final long startTimestamp, final long endTimestamp) throws IOException;
+
+ int numOfEndpoint(final long startTimestamp, final long endTimestamp) throws IOException;
+
+ int numOfConjectural(final long startTimestamp, final long endTimestamp, final int srcLayer) throws IOException;
+
+ List<Service> getAllServices(final long startTimestamp, final long endTimestamp) throws IOException;
}
diff --git a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQuery.java b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQuery.java
index 6f5e2f7..0c1d7e8 100644
--- a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQuery.java
+++ b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/MetadataQuery.java
@@ -19,6 +19,8 @@
package org.apache.skywalking.oap.query.graphql.resolver;
import com.coxautodev.graphql.tools.GraphQLQueryResolver;
+import java.io.IOException;
+import java.text.ParseException;
import java.util.List;
import org.apache.skywalking.oap.query.graphql.type.Duration;
import org.apache.skywalking.oap.server.core.CoreModule;
@@ -42,42 +44,42 @@ public class MetadataQuery implements GraphQLQueryResolver {
return metadataQueryService;
}
- public ClusterBrief getGlobalBrief(final Duration duration) {
- long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
- long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+ public ClusterBrief getGlobalBrief(final Duration duration) throws IOException, ParseException {
+ long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+ long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
- return getMetadataQueryService().getGlobalBrief(duration.getStep(), startTimeBucket, endTimeBucket);
+ return getMetadataQueryService().getGlobalBrief(startTimestamp, endTimestamp);
}
- public List<Service> getAllServices(final Duration duration) {
- long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
- long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+ public List<Service> getAllServices(final Duration duration) throws IOException, ParseException {
+ long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+ long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
- return getMetadataQueryService().getAllServices(duration.getStep(), startTimeBucket, endTimeBucket);
+ return getMetadataQueryService().getAllServices(startTimestamp, endTimestamp);
}
- public List<Service> searchServices(final Duration duration, final String keyword) {
- long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
- long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+ public List<Service> searchServices(final Duration duration, final String keyword) throws ParseException {
+ long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+ long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
- return getMetadataQueryService().searchServices(duration.getStep(), startTimeBucket, endTimeBucket, keyword);
+ return getMetadataQueryService().searchServices(startTimestamp, endTimestamp, keyword);
}
- public List<ServiceInstance> getServiceInstances(final Duration duration, final String id) {
- long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
- long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+ public Service searchService(final Duration duration, final String serviceCode) throws ParseException {
+ long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+ long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
- return getMetadataQueryService().getServiceInstances(duration.getStep(), startTimeBucket, endTimeBucket, id);
+ return getMetadataQueryService().searchService(startTimestamp, endTimestamp, serviceCode);
}
- public List<Endpoint> searchEndpoint(final String keyword, final String serviceId, final int limit) {
- return getMetadataQueryService().searchEndpoint(keyword, serviceId, limit);
- }
+ public List<ServiceInstance> getServiceInstances(final Duration duration, final String id) throws ParseException {
+ long startTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getStart());
+ long endTimestamp = DurationUtils.INSTANCE.toTimestamp(duration.getStep(), duration.getEnd());
- public Service searchService(final Duration duration, final String serviceCode) {
- long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
- long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
+ return getMetadataQueryService().getServiceInstances(startTimestamp, endTimestamp, id);
+ }
- return getMetadataQueryService().searchService(duration.getStep(), startTimeBucket, endTimeBucket, serviceCode);
+ public List<Endpoint> searchEndpoint(final String keyword, final String serviceId, final int limit) {
+ return getMetadataQueryService().searchEndpoint(keyword, serviceId, limit);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
index f2dcb1d..db1988a 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/standardization/SpanIdExchanger.java
@@ -80,9 +80,9 @@ public class SpanIdExchanger implements IdExchanger<SpanDecorator> {
standardBuilder.setPeerId(peerId);
standardBuilder.setPeer(Const.EMPTY_STRING);
-// int spanLayer = standardBuilder.getSpanLayerValue();
-// int serverType = componentLibraryCatalogService.getServerIdBasedOnComponent(standardBuilder.getComponentId());
-// networkAddressInventoryRegister.update(peerId, spanLayer, serverType);
+ int spanLayer = standardBuilder.getSpanLayerValue();
+ int serverType = componentLibraryCatalogService.getServerIdBasedOnComponent(standardBuilder.getComponentId());
+ networkAddressInventoryRegister.update(peerId, spanLayer, serverType);
}
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index 711b26a..39128a1 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -78,6 +78,7 @@ public class StorageModuleElasticsearchProvider extends ModuleProvider {
this.registerServiceImplementation(ITopologyQueryDAO.class, new TopologyQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(IMetricQueryDAO.class, new MetricQueryEsDAO(elasticSearchClient));
this.registerServiceImplementation(ITraceQueryDAO.class, new TraceQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new MetadataQueryEsDAO(elasticSearchClient));
}
@Override
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
new file mode 100644
index 0000000..78fc646
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.skywalking.oap.server.core.query.entity.Service;
+import org.apache.skywalking.oap.server.core.register.*;
+import org.apache.skywalking.oap.server.core.source.DetectPoint;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.apache.skywalking.oap.server.library.util.BooleanUtils;
+import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.*;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * @author peng-yongsheng
+ */
+public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO {
+
+ public MetadataQueryEsDAO(ElasticSearchClient client) {
+ super(client);
+ }
+
+ @Override public int numOfService(long startTimestamp, long endTimestamp) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+
+ BoolQueryBuilder boolQuery1 = QueryBuilders.boolQuery();
+ boolQuery1.must().add(QueryBuilders.rangeQuery(ServiceInventory.HEARTBEAT_TIME).gte(endTimestamp));
+ boolQuery1.must().add(QueryBuilders.rangeQuery(ServiceInventory.REGISTER_TIME).lte(endTimestamp));
+
+ BoolQueryBuilder boolQuery2 = QueryBuilders.boolQuery();
+ boolQuery2.must().add(QueryBuilders.rangeQuery(ServiceInventory.REGISTER_TIME).lte(endTimestamp));
+ boolQuery2.must().add(QueryBuilders.rangeQuery(ServiceInventory.HEARTBEAT_TIME).gte(startTimestamp));
+
+ BoolQueryBuilder timeBoolQuery = QueryBuilders.boolQuery();
+ timeBoolQuery.should().add(boolQuery1);
+ timeBoolQuery.should().add(boolQuery2);
+
+ boolQueryBuilder.must().add(timeBoolQuery);
+
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
+
+ sourceBuilder.query(boolQueryBuilder);
+ sourceBuilder.size(0);
+
+ SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
+ return (int)response.getHits().getTotalHits();
+ }
+
+ @Override public int numOfEndpoint(long startTimestamp, long endTimestamp) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+
+ BoolQueryBuilder boolQuery1 = QueryBuilders.boolQuery();
+ boolQuery1.must().add(QueryBuilders.rangeQuery(EndpointInventory.HEARTBEAT_TIME).gte(endTimestamp));
+ boolQuery1.must().add(QueryBuilders.rangeQuery(EndpointInventory.REGISTER_TIME).lte(endTimestamp));
+
+ BoolQueryBuilder boolQuery2 = QueryBuilders.boolQuery();
+ boolQuery2.must().add(QueryBuilders.rangeQuery(EndpointInventory.REGISTER_TIME).lte(endTimestamp));
+ boolQuery2.must().add(QueryBuilders.rangeQuery(EndpointInventory.HEARTBEAT_TIME).gte(startTimestamp));
+
+ BoolQueryBuilder timeBoolQuery = QueryBuilders.boolQuery();
+ timeBoolQuery.should().add(boolQuery1);
+ timeBoolQuery.should().add(boolQuery2);
+
+ boolQueryBuilder.must().add(timeBoolQuery);
+
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(EndpointInventory.DETECT_POINT, DetectPoint.SERVER.ordinal()));
+
+ sourceBuilder.query(boolQueryBuilder);
+ sourceBuilder.size(0);
+
+ SearchResponse response = getClient().search(EndpointInventory.MODEL_NAME, sourceBuilder);
+ return (int)response.getHits().getTotalHits();
+ }
+
+ @Override public int numOfConjectural(long startTimestamp, long endTimestamp, int srcLayer) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+
+ BoolQueryBuilder boolQuery1 = QueryBuilders.boolQuery();
+ boolQuery1.must().add(QueryBuilders.rangeQuery(NetworkAddressInventory.HEARTBEAT_TIME).gte(endTimestamp));
+ boolQuery1.must().add(QueryBuilders.rangeQuery(NetworkAddressInventory.REGISTER_TIME).lte(endTimestamp));
+
+ BoolQueryBuilder boolQuery2 = QueryBuilders.boolQuery();
+ boolQuery2.must().add(QueryBuilders.rangeQuery(NetworkAddressInventory.REGISTER_TIME).lte(endTimestamp));
+ boolQuery2.must().add(QueryBuilders.rangeQuery(NetworkAddressInventory.HEARTBEAT_TIME).gte(startTimestamp));
+
+ BoolQueryBuilder timeBoolQuery = QueryBuilders.boolQuery();
+ timeBoolQuery.should().add(boolQuery1);
+ timeBoolQuery.should().add(boolQuery2);
+
+ boolQueryBuilder.must().add(timeBoolQuery);
+
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(NetworkAddressInventory.SRC_LAYER, srcLayer));
+
+ sourceBuilder.query(boolQueryBuilder);
+ sourceBuilder.size(0);
+
+ SearchResponse response = getClient().search(NetworkAddressInventory.MODEL_NAME, sourceBuilder);
+
+ return (int)response.getHits().getTotalHits();
+ }
+
+ @Override
+ public List<Service> getAllServices(long startTimestamp, long endTimestamp) throws IOException {
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+
+ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
+
+ BoolQueryBuilder boolQuery1 = QueryBuilders.boolQuery();
+ boolQuery1.must().add(QueryBuilders.rangeQuery(ServiceInventory.HEARTBEAT_TIME).gte(endTimestamp));
+ boolQuery1.must().add(QueryBuilders.rangeQuery(ServiceInventory.REGISTER_TIME).lte(endTimestamp));
+
+ BoolQueryBuilder boolQuery2 = QueryBuilders.boolQuery();
+ boolQuery2.must().add(QueryBuilders.rangeQuery(ServiceInventory.REGISTER_TIME).lte(endTimestamp));
+ boolQuery2.must().add(QueryBuilders.rangeQuery(ServiceInventory.HEARTBEAT_TIME).gte(startTimestamp));
+
+ BoolQueryBuilder timeBoolQuery = QueryBuilders.boolQuery();
+ timeBoolQuery.should().add(boolQuery1);
+ timeBoolQuery.should().add(boolQuery2);
+
+ boolQueryBuilder.must().add(timeBoolQuery);
+
+ boolQueryBuilder.must().add(QueryBuilders.termQuery(ServiceInventory.IS_ADDRESS, BooleanUtils.FALSE));
+
+ sourceBuilder.query(boolQueryBuilder);
+ sourceBuilder.size(100);
+
+ SearchResponse response = getClient().search(ServiceInventory.MODEL_NAME, sourceBuilder);
+
+ List<Service> services = new ArrayList<>();
+ for (SearchHit searchHit : response.getHits()) {
+ Map<String, Object> sourceAsMap = searchHit.getSourceAsMap();
+
+ Service service = new Service();
+ service.setId(String.valueOf(sourceAsMap.get(ServiceInventory.SEQUENCE)));
+ service.setName((String)sourceAsMap.get(ServiceInventory.NAME));
+ services.add(service);
+ }
+
+ return services;
+ }
+}