You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/11/25 02:06:13 UTC
[incubator-skywalking] branch mysql-style updated: Add all miss DAO
implement in H2 storage and make them compatible in MySQL implementor.
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch mysql-style
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/mysql-style by this push:
new df3e165 Add all miss DAO implement in H2 storage and make them compatible in MySQL implementor.
df3e165 is described below
commit df3e1651c524b5fb994e5990b7922fa83a142236
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Sun Nov 25 10:06:02 2018 +0800
Add all miss DAO implement in H2 storage and make them compatible in MySQL implementor.
---
.../oap/server/core/alarm/AlarmRecord.java | 8 +-
.../client/jdbc/hikaricp/JDBCHikariCPClient.java | 37 ++++++++
.../src/main/resources/application.yml | 19 ++--
.../elasticsearch/base/HistoryDeleteEsDAO.java | 6 +-
.../storage-jdbc-hikaricp-plugin/pom.xml | 10 +--
.../storage/plugin/jdbc/h2/H2StorageProvider.java | 4 +-
.../plugin/jdbc/h2/dao/H2AggregationQueryDAO.java | 19 ++--
.../plugin/jdbc/h2/dao/H2AlarmQueryDAO.java | 67 +++++++++++++-
.../plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java | 17 ++++
.../plugin/jdbc/h2/dao/H2MetadataQueryDAO.java | 100 +++++++++++----------
.../plugin/jdbc/h2/dao/H2TopologyQueryDAO.java | 14 +--
.../plugin/jdbc/h2/dao/H2TraceQueryDAO.java | 8 +-
.../MySQLAlarmQueryDAO.java} | 18 ++--
.../plugin/jdbc/mysql/MySQLStorageProvider.java | 40 ++++-----
.../MySQLTraceQueryDAO.java} | 15 ++--
15 files changed, 257 insertions(+), 125 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
index 90dc8dd..9257829 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/alarm/AlarmRecord.java
@@ -38,10 +38,10 @@ public class AlarmRecord extends Record {
public static final String INDEX_NAME = "alarm_record";
public static final String SCOPE = "scope";
- private static final String NAME = "name";
- private static final String ID0 = "id0";
- private static final String ID1 = "id1";
- private static final String START_TIME = "start_time";
+ public static final String NAME = "name";
+ public static final String ID0 = "id0";
+ public static final String ID1 = "id1";
+ public static final String START_TIME = "start_time";
public static final String ALARM_MESSAGE = "alarm_message";
@Override public String id() {
diff --git a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
index 8f6967b..fcdeb11 100644
--- a/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
+++ b/oap-server/server-library/library-client/src/main/java/org/apache/skywalking/oap/server/library/client/jdbc/hikaricp/JDBCHikariCPClient.java
@@ -80,6 +80,43 @@ public class JDBCHikariCPClient implements Client {
}
}
+ public boolean execute(Connection connection, String sql, Object... params) throws JDBCClientException {
+ logger.debug("execute query with result: {}", sql);
+ boolean result;
+ PreparedStatement statement = null;
+ try {
+ statement = connection.prepareStatement(sql);
+ if (params != null) {
+ for (int i = 0; i < params.length; i++) {
+ Object param = params[i];
+ if (param instanceof String) {
+ statement.setString(i + 1, (String)param);
+ } else if (param instanceof Integer) {
+ statement.setInt(i + 1, (int)param);
+ } else if (param instanceof Double) {
+ statement.setDouble(i + 1, (double)param);
+ } else if (param instanceof Long) {
+ statement.setLong(i + 1, (long)param);
+ } else {
+ throw new JDBCClientException("Unsupported data type, type=" + param.getClass().getName());
+ }
+ }
+ }
+ result = statement.execute();
+ statement.closeOnCompletion();
+ } catch (SQLException e) {
+ if (statement != null) {
+ try {
+ statement.close();
+ } catch (SQLException e1) {
+ }
+ }
+ throw new JDBCClientException(e.getMessage(), e);
+ }
+
+ return result;
+ }
+
public ResultSet executeQuery(Connection connection, String sql, Object... params) throws JDBCClientException {
logger.debug("execute query with result: {}", sql);
ResultSet rs;
diff --git a/oap-server/server-starter/src/main/resources/application.yml b/oap-server/server-starter/src/main/resources/application.yml
index d94a82a..f0d23e2 100644
--- a/oap-server/server-starter/src/main/resources/application.yml
+++ b/oap-server/server-starter/src/main/resources/application.yml
@@ -46,19 +46,20 @@ core:
dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45} # Unit is day
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18} # Unit is month
storage:
- elasticsearch:
- clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
- indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
- indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
- # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
- bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
- bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
- flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
- concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
+# elasticsearch:
+# clusterNodes: ${SW_STORAGE_ES_CLUSTER_NODES:localhost:9200}
+# indexShardsNumber: ${SW_STORAGE_ES_INDEX_SHARDS_NUMBER:2}
+# indexReplicasNumber: ${SW_STORAGE_ES_INDEX_REPLICAS_NUMBER:0}
+# # Batch process setting, refer to https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.5/java-docs-bulk-processor.html
+# bulkActions: ${SW_STORAGE_ES_BULK_ACTIONS:2000} # Execute the bulk every 2000 requests
+# bulkSize: ${SW_STORAGE_ES_BULK_SIZE:20} # flush the bulk every 20mb
+# flushInterval: ${SW_STORAGE_ES_FLUSH_INTERVAL:10} # flush the bulk every 10 seconds whatever the number of requests
+# concurrentRequests: ${SW_STORAGE_ES_CONCURRENT_REQUESTS:2} # the number of concurrent requests
# h2:
# driver: ${SW_STORAGE_H2_DRIVER:org.h2.jdbcx.JdbcDataSource}
# url: ${SW_STORAGE_H2_URL:jdbc:h2:mem:skywalking-oap-db}
# user: ${SW_STORAGE_H2_USER:sa}
+ mysql:
receiver-register:
default:
receiver-trace:
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
index d105fdd..1ad4371 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/HistoryDeleteEsDAO.java
@@ -19,10 +19,10 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
@@ -37,7 +37,7 @@ public class HistoryDeleteEsDAO extends EsDAO implements IHistoryDeleteDAO {
@Override
public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
- int statusCode = getClient().delete(modelName, Indicator.TIME_BUCKET, timeBucketBefore);
+ int statusCode = getClient().delete(modelName, timeBucketColumnName, timeBucketBefore);
if (logger.isDebugEnabled()) {
logger.debug("Delete history from {} index, status code {}", modelName, statusCode);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml
index 65f7eb0..1fd36e0 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/pom.xml
@@ -43,11 +43,11 @@
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
</dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>8.0.13</version>
- </dependency>
+ <!--<dependency>-->
+ <!--<groupId>mysql</groupId>-->
+ <!--<artifactId>mysql-connector-java</artifactId>-->
+ <!--<version>8.0.13</version>-->
+ <!--</dependency>-->
</dependencies>
</project>
\ No newline at end of file
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
index d52c679..169b81f 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/H2StorageProvider.java
@@ -112,8 +112,8 @@ public class H2StorageProvider extends ModuleProvider {
this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client));
this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client));
this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
- this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO());
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO());
+ this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO(h2Client));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(h2Client));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
index 6fbdee8..65a44cb 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AggregationQueryDAO.java
@@ -94,17 +94,18 @@ public class H2AggregationQueryDAO implements IAggregationQueryDAO {
List<TopNEntity> topNEntities = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), conditions.toArray(new Object[0]));
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), conditions.toArray(new Object[0]))) {
- try {
- while (resultSet.next()) {
- TopNEntity topNEntity = new TopNEntity();
- topNEntity.setId(resultSet.getString(Indicator.ENTITY_ID));
- topNEntity.setValue(resultSet.getLong("value"));
- topNEntities.add(topNEntity);
+ try {
+ while (resultSet.next()) {
+ TopNEntity topNEntity = new TopNEntity();
+ topNEntity.setId(resultSet.getString(Indicator.ENTITY_ID));
+ topNEntity.setValue(resultSet.getLong("value"));
+ topNEntities.add(topNEntity);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
}
- } catch (SQLException e) {
- throw new IOException(e);
}
} catch (SQLException e) {
throw new IOException(e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
index 4982d43..6662100 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2AlarmQueryDAO.java
@@ -19,19 +19,80 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.skywalking.oap.server.core.alarm.AlarmRecord;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import org.apache.skywalking.oap.server.core.query.entity.AlarmMessage;
import org.apache.skywalking.oap.server.core.query.entity.Alarms;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.storage.query.IAlarmQueryDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.util.StringUtils;
/**
- * As a demo show env, not necessary to support alarm.
- *
* @author wusheng
*/
public class H2AlarmQueryDAO implements IAlarmQueryDAO {
+ private JDBCHikariCPClient client;
+
+ public H2AlarmQueryDAO(JDBCHikariCPClient client) {
+ this.client = client;
+ }
+
@Override
public Alarms getAlarm(Scope scope, String keyword, int limit, int from, long startTB,
long endTB) throws IOException {
- return new Alarms();
+
+ StringBuilder sql = new StringBuilder();
+ List<Object> parameters = new ArrayList<>(10);
+ sql.append("from ").append(SegmentRecord.INDEX_NAME).append(" where ");
+ sql.append(" 1=1 ");
+ if (startTB != 0 && endTB != 0) {
+ sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" >= ?");
+ parameters.add(startTB);
+ sql.append(" and ").append(SegmentRecord.TIME_BUCKET).append(" <= ?");
+ parameters.add(endTB);
+ }
+
+ if (StringUtils.isNotEmpty(keyword)) {
+ sql.append(" and ").append(AlarmRecord.ALARM_MESSAGE).append(" like '%").append(keyword).append("%' ");
+ }
+ sql.append(" order by ").append(AlarmRecord.START_TIME).append(" desc ");
+ this.buildLimit(sql, from, limit);
+
+ Alarms alarms = new Alarms();
+ try (Connection connection = client.getConnection()) {
+
+ try (ResultSet resultSet = client.executeQuery(connection, "select count(1) total from (select 1 " + sql.toString() + " )", parameters.toArray(new Object[0]))) {
+ while (resultSet.next()) {
+ alarms.setTotal(resultSet.getInt("total"));
+ }
+ }
+
+ try (ResultSet resultSet = client.executeQuery(connection, "select * " + sql.toString(), parameters.toArray(new Object[0]))) {
+ while (resultSet.next()) {
+ AlarmMessage message = new AlarmMessage();
+ message.setId(resultSet.getString(AlarmRecord.ID0));
+ message.setMessage(resultSet.getString(AlarmRecord.ALARM_MESSAGE));
+ message.setStartTime(resultSet.getLong(AlarmRecord.START_TIME));
+ message.setScope(Scope.valueOf(resultSet.getInt(AlarmRecord.SCOPE)));
+
+ alarms.getMsgs().add(message);
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+
+ return alarms;
+ }
+
+ protected void buildLimit(StringBuilder sql, int from, int limit) {
+ sql.append(" LIMIT ").append(limit);
+ sql.append(" OFFSET ").append(from);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
index 60d5eed..5c0d48e 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
@@ -19,14 +19,31 @@
package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.SQLBuilder;
/**
* @author wusheng
*/
public class H2HistoryDeleteDAO implements IHistoryDeleteDAO {
+ private JDBCHikariCPClient client;
+
+ public H2HistoryDeleteDAO(JDBCHikariCPClient client) {
+ this.client = client;
+ }
+
@Override
public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
+ SQLBuilder dataDeleteSQL = new SQLBuilder("delete from " + modelName + " where ").append(timeBucketColumnName).append("<= ?");
+ try (Connection connection = client.getConnection()) {
+ client.execute(connection, dataDeleteSQL.toString(), timeBucketBefore);
+ } catch (JDBCClientException | SQLException e) {
+ throw new IOException(e.getMessage(), e);
+ }
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java
index 769738d..ce8a54d 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2MetadataQueryDAO.java
@@ -58,9 +58,10 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
sql.append(" and ").append(ServiceInventory.IS_ADDRESS).append("=0");
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
- while (resultSet.next()) {
- return resultSet.getInt("num");
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
+ while (resultSet.next()) {
+ return resultSet.getInt("num");
+ }
}
} catch (SQLException e) {
throw new IOException(e);
@@ -76,10 +77,11 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
sql.append(" and ").append(EndpointInventory.DETECT_POINT).append("=").append(DetectPoint.SERVER.ordinal());
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
- while (resultSet.next()) {
- return resultSet.getInt("num");
+ while (resultSet.next()) {
+ return resultSet.getInt("num");
+ }
}
} catch (SQLException e) {
throw new IOException(e);
@@ -97,9 +99,10 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
condition.add(srcLayer);
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
- while (resultSet.next()) {
- return resultSet.getInt("num");
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
+ while (resultSet.next()) {
+ return resultSet.getInt("num");
+ }
}
} catch (SQLException e) {
throw new IOException(e);
@@ -117,8 +120,9 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
condition.add(BooleanUtils.FALSE);
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
- return buildServices(resultSet);
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
+ return buildServices(resultSet);
+ }
} catch (SQLException e) {
throw new IOException(e);
}
@@ -138,8 +142,9 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
sql.append(" limit 100");
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
- return buildServices(resultSet);
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
+ return buildServices(resultSet);
+ }
} catch (SQLException e) {
throw new IOException(e);
}
@@ -155,13 +160,14 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
condition.add(serviceCode);
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
- while (resultSet.next()) {
- Service service = new Service();
- service.setId(resultSet.getInt(ServiceInventory.SEQUENCE));
- service.setName(resultSet.getString(ServiceInventory.NAME));
- return service;
+ while (resultSet.next()) {
+ Service service = new Service();
+ service.setId(resultSet.getInt(ServiceInventory.SEQUENCE));
+ service.setName(resultSet.getString(ServiceInventory.NAME));
+ return service;
+ }
}
} catch (SQLException e) {
throw new IOException(e);
@@ -186,13 +192,14 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
List<Endpoint> endpoints = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
- while (resultSet.next()) {
- Endpoint endpoint = new Endpoint();
- endpoint.setId(resultSet.getInt(EndpointInventory.SEQUENCE));
- endpoint.setName(resultSet.getString(EndpointInventory.NAME));
- endpoints.add(endpoint);
+ while (resultSet.next()) {
+ Endpoint endpoint = new Endpoint();
+ endpoint.setId(resultSet.getInt(EndpointInventory.SEQUENCE));
+ endpoint.setName(resultSet.getString(EndpointInventory.NAME));
+ endpoints.add(endpoint);
+ }
}
} catch (SQLException e) {
throw new IOException(e);
@@ -211,31 +218,32 @@ public class H2MetadataQueryDAO implements IMetadataQueryDAO {
List<ServiceInstance> serviceInstances = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]));
+ try (ResultSet resultSet = h2Client.executeQuery(connection, sql.toString(), condition.toArray(new Object[0]))) {
- while (resultSet.next()) {
- ServiceInstance serviceInstance = new ServiceInstance();
- serviceInstance.setId(resultSet.getString(ServiceInstanceInventory.SEQUENCE));
- serviceInstance.setName(resultSet.getString(ServiceInstanceInventory.NAME));
- int languageId = resultSet.getInt(ServiceInstanceInventory.LANGUAGE);
- serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(languageId));
+ while (resultSet.next()) {
+ ServiceInstance serviceInstance = new ServiceInstance();
+ serviceInstance.setId(resultSet.getString(ServiceInstanceInventory.SEQUENCE));
+ serviceInstance.setName(resultSet.getString(ServiceInstanceInventory.NAME));
+ int languageId = resultSet.getInt(ServiceInstanceInventory.LANGUAGE);
+ serviceInstance.setLanguage(LanguageTrans.INSTANCE.value(languageId));
- String osName = resultSet.getString(ServiceInstanceInventory.OS_NAME);
- if (StringUtils.isNotEmpty(osName)) {
- serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.OS_NAME, osName));
- }
- String hostName = resultSet.getString(ServiceInstanceInventory.HOST_NAME);
- if (StringUtils.isNotEmpty(hostName)) {
- serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.HOST_NAME, hostName));
- }
- serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.PROCESS_NO, resultSet.getString(ServiceInstanceInventory.PROCESS_NO)));
+ String osName = resultSet.getString(ServiceInstanceInventory.OS_NAME);
+ if (StringUtils.isNotEmpty(osName)) {
+ serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.OS_NAME, osName));
+ }
+ String hostName = resultSet.getString(ServiceInstanceInventory.HOST_NAME);
+ if (StringUtils.isNotEmpty(hostName)) {
+ serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.HOST_NAME, hostName));
+ }
+ serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.PROCESS_NO, resultSet.getString(ServiceInstanceInventory.PROCESS_NO)));
- List<String> ipv4s = ServiceInstanceInventory.AgentOsInfo.ipv4sDeserialize(resultSet.getString(ServiceInstanceInventory.IPV4S));
- for (String ipv4 : ipv4s) {
- serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.IPV4S, ipv4));
- }
+ List<String> ipv4s = ServiceInstanceInventory.AgentOsInfo.ipv4sDeserialize(resultSet.getString(ServiceInstanceInventory.IPV4S));
+ for (String ipv4 : ipv4s) {
+ serviceInstance.getAttributes().add(new Attribute(ServiceInstanceInventory.IPV4S, ipv4));
+ }
- serviceInstances.add(serviceInstance);
+ serviceInstances.add(serviceInstance);
+ }
}
} catch (SQLException e) {
throw new IOException(e);
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopologyQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopologyQueryDAO.java
index c51af73..048da5b 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopologyQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TopologyQueryDAO.java
@@ -95,14 +95,15 @@ public class H2TopologyQueryDAO implements ITopologyQueryDAO {
}
List<Call> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Indicator.ENTITY_ID
+ " component_id from " + tableName + " where "
+ Indicator.TIME_BUCKET + ">= ? and " + Indicator.TIME_BUCKET + "<=? "
+ serviceIdMatchSql.toString()
+ " group by " + Indicator.ENTITY_ID,
- conditions);
- buildCalls(resultSet, calls, isClientSide);
+ conditions)) {
+ buildCalls(resultSet, calls, isClientSide);
+ }
} catch (SQLException e) {
throw new IOException(e);
}
@@ -117,14 +118,15 @@ public class H2TopologyQueryDAO implements ITopologyQueryDAO {
conditions[2] = id;
List<Call> calls = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
- ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ try (ResultSet resultSet = h2Client.executeQuery(connection, "select "
+ Indicator.ENTITY_ID
+ " from " + tableName + " where "
+ Indicator.TIME_BUCKET + ">= ? and " + Indicator.TIME_BUCKET + "<=? and "
+ (isSourceId ? sourceCName : destCName) + "=?"
+ " group by " + Indicator.ENTITY_ID,
- conditions);
- buildCalls(resultSet, calls, isSourceId);
+ conditions)) {
+ buildCalls(resultSet, calls, isSourceId);
+ }
} catch (SQLException e) {
throw new IOException(e);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
index fe2c764..00f8510 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2TraceQueryDAO.java
@@ -102,8 +102,7 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
sql.append(" order by ").append(SegmentRecord.LATENCY).append(" ").append(SortOrder.DESC);
break;
}
- sql.append(" LIMIT ").append(limit);
- sql.append(" OFFSET ").append(from);
+ buildLimit(sql, from, limit);
TraceBrief traceBrief = new TraceBrief();
try (Connection connection = h2Client.getConnection()) {
@@ -135,6 +134,11 @@ public class H2TraceQueryDAO implements ITraceQueryDAO {
return traceBrief;
}
+ protected void buildLimit(StringBuilder sql, int from, int limit) {
+ sql.append(" LIMIT ").append(limit);
+ sql.append(" OFFSET ").append(from);
+ }
+
@Override public List<SegmentRecord> queryByTraceId(String traceId) throws IOException {
List<SegmentRecord> segmentRecords = new ArrayList<>();
try (Connection connection = h2Client.getConnection()) {
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLAlarmQueryDAO.java
similarity index 60%
copy from oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
copy to oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLAlarmQueryDAO.java
index 60d5eed..8e20e83 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLAlarmQueryDAO.java
@@ -16,17 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
-import java.io.IOException;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
-/**
- * @author wusheng
- */
-public class H2HistoryDeleteDAO implements IHistoryDeleteDAO {
- @Override
- public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
+public class MySQLAlarmQueryDAO extends H2AlarmQueryDAO {
+ public MySQLAlarmQueryDAO(JDBCHikariCPClient client) {
+ super(client);
+ }
+ @Override protected void buildLimit(StringBuilder sql, int from, int limit) {
+ sql.append(" LIMIT ").append(from).append(", ").append(limit);
}
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
index 41afdf5..bd75613 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLStorageProvider.java
@@ -46,7 +46,6 @@ import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageConfig;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.H2StorageProvider;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AggregationQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2AlarmQueryDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2BatchDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2EndpointInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2HistoryDeleteDAO;
@@ -57,7 +56,6 @@ import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInst
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2ServiceInventoryCacheDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2StorageDAO;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TopologyQueryDAO;
-import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +73,7 @@ public class MySQLStorageProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(H2StorageProvider.class);
private H2StorageConfig config;
- private JDBCHikariCPClient h2Client;
+ private JDBCHikariCPClient mysqlClient;
public MySQLStorageProvider() {
config = new H2StorageConfig();
@@ -101,34 +99,34 @@ public class MySQLStorageProvider extends ModuleProvider {
throw new ModuleStartException("load datasource setting file failure.", e);
}
- h2Client = new JDBCHikariCPClient(settings);
+ mysqlClient = new JDBCHikariCPClient(settings);
- this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(h2Client));
- this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(h2Client));
- this.registerServiceImplementation(IRegisterLockDAO.class, new MySQLRegisterTableLockDAO(h2Client));
+ this.registerServiceImplementation(IBatchDAO.class, new H2BatchDAO(mysqlClient));
+ this.registerServiceImplementation(StorageDAO.class, new H2StorageDAO(mysqlClient));
+ this.registerServiceImplementation(IRegisterLockDAO.class, new MySQLRegisterTableLockDAO(mysqlClient));
- this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new H2ServiceInventoryCacheDAO(h2Client));
- this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new H2ServiceInstanceInventoryCacheDAO(h2Client));
- this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new H2EndpointInventoryCacheDAO(h2Client));
- this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new H2NetworkAddressInventoryCacheDAO(h2Client));
+ this.registerServiceImplementation(IServiceInventoryCacheDAO.class, new H2ServiceInventoryCacheDAO(mysqlClient));
+ this.registerServiceImplementation(IServiceInstanceInventoryCacheDAO.class, new H2ServiceInstanceInventoryCacheDAO(mysqlClient));
+ this.registerServiceImplementation(IEndpointInventoryCacheDAO.class, new H2EndpointInventoryCacheDAO(mysqlClient));
+ this.registerServiceImplementation(INetworkAddressInventoryCacheDAO.class, new H2NetworkAddressInventoryCacheDAO(mysqlClient));
- this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(h2Client));
- this.registerServiceImplementation(IMetricQueryDAO.class, new H2MetricQueryDAO(h2Client));
- this.registerServiceImplementation(ITraceQueryDAO.class, new H2TraceQueryDAO(h2Client));
- this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(h2Client));
- this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(h2Client));
- this.registerServiceImplementation(IAlarmQueryDAO.class, new H2AlarmQueryDAO());
- this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO());
+ this.registerServiceImplementation(ITopologyQueryDAO.class, new H2TopologyQueryDAO(mysqlClient));
+ this.registerServiceImplementation(IMetricQueryDAO.class, new H2MetricQueryDAO(mysqlClient));
+ this.registerServiceImplementation(ITraceQueryDAO.class, new MySQLTraceQueryDAO(mysqlClient));
+ this.registerServiceImplementation(IMetadataQueryDAO.class, new H2MetadataQueryDAO(mysqlClient));
+ this.registerServiceImplementation(IAggregationQueryDAO.class, new H2AggregationQueryDAO(mysqlClient));
+ this.registerServiceImplementation(IAlarmQueryDAO.class, new MySQLAlarmQueryDAO(mysqlClient));
+ this.registerServiceImplementation(IHistoryDeleteDAO.class, new H2HistoryDeleteDAO(mysqlClient));
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
try {
- h2Client.connect();
+ mysqlClient.connect();
MySQLTableInstaller installer = new MySQLTableInstaller(getManager());
- installer.install(h2Client);
+ installer.install(mysqlClient);
- new MySQLRegisterLockInstaller().install(h2Client);
+ new MySQLRegisterLockInstaller().install(mysqlClient);
} catch (StorageException e) {
throw new ModuleStartException(e.getMessage(), e);
}
diff --git a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java
similarity index 60%
copy from oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
copy to oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java
index 60d5eed..d089c21 100644
--- a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2HistoryDeleteDAO.java
+++ b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/mysql/MySQLTraceQueryDAO.java
@@ -16,17 +16,20 @@
*
*/
-package org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao;
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.mysql;
-import java.io.IOException;
-import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
+import org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.storage.plugin.jdbc.h2.dao.H2TraceQueryDAO;
/**
* @author wusheng
*/
-public class H2HistoryDeleteDAO implements IHistoryDeleteDAO {
- @Override
- public void deleteHistory(String modelName, String timeBucketColumnName, Long timeBucketBefore) throws IOException {
+public class MySQLTraceQueryDAO extends H2TraceQueryDAO {
+ public MySQLTraceQueryDAO(JDBCHikariCPClient mysqlClient) {
+ super(mysqlClient);
+ }
+ @Override protected void buildLimit(StringBuilder sql, int from, int limit) {
+ sql.append(" LIMIT ").append(from).append(", ").append(limit);
}
}