You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/09/19 07:22:04 UTC
[incubator-skywalking] branch master updated: Add record
persistence stream analysis. (#1683)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 13fa303 Add record persistence stream analysis. (#1683)
13fa303 is described below
commit 13fa30307a08b2a3cab29e797eaad19157bcc72c
Author: 彭勇升 pengys <80...@qq.com>
AuthorDate: Wed Sep 19 15:21:56 2018 +0800
Add record persistence stream analysis. (#1683)
---
.../oap/server/core/CoreModuleProvider.java | 2 +
.../server/core/analysis/DispatcherManager.java | 3 +
.../server/core/analysis/data/MergeDataCache.java | 6 +-
.../core/analysis/data/MergeDataCollection.java | 27 +++---
.../core/analysis/data/NonMergeDataCache.java | 12 +--
.../core/analysis/data/NonMergeDataCollection.java | 26 +++--
.../data/{Collection.java => SWCollection.java} | 12 ++-
.../oap/server/core/analysis/data/Window.java | 18 ++--
.../analysis/manual/segment/SegmentDispatcher.java | 45 +++++++++
.../analysis/manual/segment/SegmentRecord.java | 103 ++++++++++++++++++++
.../{data/Collection.java => record/Record.java} | 26 ++---
.../analysis/record/annotation/RecordType.java} | 15 ++-
.../record/annotation/RecordTypeListener.java} | 37 +++----
.../analysis/worker/IndicatorAggregateWorker.java | 2 +-
.../analysis/worker/IndicatorPersistentWorker.java | 56 ++---------
...ersistentWorker.java => PersistenceWorker.java} | 80 +++-------------
.../analysis/worker/RecordPersistentWorker.java | 68 +++++++++++++
.../server/core/analysis/worker/RecordProcess.java | 62 ++++++++++++
.../skywalking/oap/server/core/source/Scope.java | 3 +-
.../core/source/{Scope.java => Segment.java} | 27 +++++-
.../Collection.java => storage/IRecordDAO.java} | 25 ++---
.../oap/server/core/storage/PersistenceTimer.java | 8 +-
.../oap/server/core/storage/StorageDAO.java | 3 +
.../oap/server/library/util/CollectionUtils.java | 8 ++
.../trace/provider/TraceModuleProvider.java | 2 +
.../trace/provider/parser/SegmentParse.java | 12 +--
.../provider/parser/decorator/SegmentCoreInfo.java | 70 +-------------
.../provider/parser/listener/SpanListener.java | 2 +-
.../listener/segment/SegmentSpanListener.java | 106 +++++++++++++++++++++
.../server-starter/src/main/resources/log4j2.xml | 4 +-
.../elasticsearch/base/ColumnTypeEsMapping.java | 2 +
.../plugin/elasticsearch/base/RecordEsDAO.java | 60 ++++++++++++
.../plugin/elasticsearch/base/StorageEsDAO.java | 5 +
.../elasticsearch/base/StorageEsInstaller.java | 7 +-
34 files changed, 631 insertions(+), 313 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index d22648e..91d0b33 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -20,6 +20,7 @@ package org.apache.skywalking.oap.server.core;
import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
+import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
@@ -115,6 +116,7 @@ public class CoreModuleProvider extends ModuleProvider {
annotationScan.registerListener(streamAnnotationListener);
annotationScan.registerListener(new IndicatorTypeListener(getManager()));
annotationScan.registerListener(new InventoryTypeListener(getManager()));
+ annotationScan.registerListener(new RecordTypeListener(getManager()));
this.remoteClientManager = new RemoteClientManager(getManager());
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
index a676500..b216ff8 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java
@@ -31,6 +31,7 @@ import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancej
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.service.*;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.source.*;
@@ -50,6 +51,8 @@ public class DispatcherManager {
this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
+ this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()});
+
this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
index b5de443..778de6b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCache.java
@@ -23,11 +23,11 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
/**
* @author peng-yongsheng
*/
-public class MergeDataCache<INDICATOR extends Indicator> extends Window<MergeDataCollection<INDICATOR>> implements DataCache {
+public class MergeDataCache<INDICATOR extends Indicator> extends Window<INDICATOR> implements DataCache {
- private MergeDataCollection<INDICATOR> lockedMergeDataCollection;
+ private SWCollection<INDICATOR> lockedMergeDataCollection;
- @Override public MergeDataCollection<INDICATOR> collectionInstance() {
+ @Override public SWCollection<INDICATOR> collectionInstance() {
return new MergeDataCollection<>();
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
index bd53c39..5a703e5 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/MergeDataCollection.java
@@ -24,13 +24,14 @@ import org.apache.skywalking.oap.server.core.remote.data.StreamData;
/**
* @author peng-yongsheng
*/
-public class MergeDataCollection<STREAM_DATA extends StreamData> implements Collection<Map<STREAM_DATA, STREAM_DATA>> {
- private Map<STREAM_DATA, STREAM_DATA> data;
+public class MergeDataCollection<STREAM_DATA extends StreamData> implements SWCollection<STREAM_DATA> {
+
+ private Map<STREAM_DATA, STREAM_DATA> collection;
private volatile boolean writing;
private volatile boolean reading;
MergeDataCollection() {
- this.data = new HashMap<>();
+ this.collection = new HashMap<>();
this.writing = false;
this.reading = false;
}
@@ -59,27 +60,27 @@ public class MergeDataCollection<STREAM_DATA extends StreamData> implements Coll
return reading;
}
- boolean containsKey(STREAM_DATA key) {
- return data.containsKey(key);
+ @Override public boolean containsKey(STREAM_DATA key) {
+ return collection.containsKey(key);
}
- void put(STREAM_DATA value) {
- data.put(value, value);
+ @Override public void put(STREAM_DATA value) {
+ collection.put(value, value);
}
- public STREAM_DATA get(STREAM_DATA key) {
- return data.get(key);
+ @Override public STREAM_DATA get(STREAM_DATA key) {
+ return collection.get(key);
}
@Override public int size() {
- return data.size();
+ return collection.size();
}
@Override public void clear() {
- data.clear();
+ collection.clear();
}
- public Map<STREAM_DATA, STREAM_DATA> collection() {
- return data;
+ @Override public Collection<STREAM_DATA> collection() {
+ return collection.values();
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
index ab40f7d..238b127 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCache.java
@@ -18,21 +18,21 @@
package org.apache.skywalking.oap.server.core.analysis.data;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
/**
* @author peng-yongsheng
*/
-public class NonMergeDataCache<STREAM_DATA extends StreamData> extends Window<NonMergeDataCollection<STREAM_DATA>> implements DataCache {
+public class NonMergeDataCache<STORAGE_DATA extends StorageData> extends Window<STORAGE_DATA> implements DataCache {
- private NonMergeDataCollection<STREAM_DATA> lockedMergeDataCollection;
+ private SWCollection<STORAGE_DATA> lockedMergeDataCollection;
- @Override public NonMergeDataCollection<STREAM_DATA> collectionInstance() {
+ @Override public SWCollection<STORAGE_DATA> collectionInstance() {
return new NonMergeDataCollection<>();
}
- public void add(STREAM_DATA data) {
- lockedMergeDataCollection.add(data);
+ public void add(STORAGE_DATA data) {
+ lockedMergeDataCollection.put(data);
}
@Override public void writing() {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
index b0498c3..2a4ac3a 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/NonMergeDataCollection.java
@@ -19,19 +19,19 @@
package org.apache.skywalking.oap.server.core.analysis.data;
import java.util.*;
-import org.apache.skywalking.oap.server.core.remote.data.StreamData;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
/**
* @author peng-yongsheng
*/
-public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements Collection<List<STREAM_DATA>> {
+public class NonMergeDataCollection<STORAGE_DATA extends StorageData> implements SWCollection<STORAGE_DATA> {
- private final List<STREAM_DATA> data;
+ private final List<STORAGE_DATA> data;
private volatile boolean writing;
private volatile boolean reading;
NonMergeDataCollection() {
- this.data = new LinkedList<>();
+ this.data = new ArrayList<>();
this.writing = false;
this.reading = false;
}
@@ -60,10 +60,6 @@ public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements C
return reading;
}
- void add(STREAM_DATA value) {
- data.add(value);
- }
-
@Override public int size() {
return data.size();
}
@@ -72,7 +68,19 @@ public class NonMergeDataCollection<STREAM_DATA extends StreamData> implements C
data.clear();
}
- public List<STREAM_DATA> collection() {
+ @Override public boolean containsKey(STORAGE_DATA key) {
+ throw new UnsupportedOperationException("None merge data collection not support containsKey operation.");
+ }
+
+ @Override public STORAGE_DATA get(STORAGE_DATA key) {
+ throw new UnsupportedOperationException("None merge data collection not support get operation.");
+ }
+
+ @Override public void put(STORAGE_DATA value) {
+ data.add(value);
+ }
+
+ @Override public Collection<STORAGE_DATA> collection() {
return data;
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
similarity index 85%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
index f1d52a8..032563d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/SWCollection.java
@@ -18,10 +18,12 @@
package org.apache.skywalking.oap.server.core.analysis.data;
+import java.util.Collection;
+
/**
* @author peng-yongsheng
*/
-public interface Collection<Data> {
+public interface SWCollection<DATA> {
void reading();
@@ -39,5 +41,11 @@ public interface Collection<Data> {
void finishWriting();
- Data collection();
+ Collection<DATA> collection();
+
+ boolean containsKey(DATA key);
+
+ DATA get(DATA key);
+
+ void put(DATA value);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
index 38570b3..6f9486d 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Window.java
@@ -23,22 +23,22 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* @author peng-yongsheng
*/
-public abstract class Window<WINDOW_COLLECTION extends Collection> {
+public abstract class Window<DATA> {
private AtomicInteger windowSwitch = new AtomicInteger(0);
- private WINDOW_COLLECTION pointer;
+ private SWCollection<DATA> pointer;
- private WINDOW_COLLECTION windowDataA;
- private WINDOW_COLLECTION windowDataB;
+ private SWCollection<DATA> windowDataA;
+ private SWCollection<DATA> windowDataB;
- protected Window() {
+ Window() {
this.windowDataA = collectionInstance();
this.windowDataB = collectionInstance();
this.pointer = windowDataA;
}
- public abstract WINDOW_COLLECTION collectionInstance();
+ public abstract SWCollection<DATA> collectionInstance();
public boolean trySwitchPointer() {
return windowSwitch.incrementAndGet() == 1 && !getLast().isReading();
@@ -57,7 +57,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
getLast().reading();
}
- protected WINDOW_COLLECTION getCurrentAndWriting() {
+ SWCollection<DATA> getCurrentAndWriting() {
if (pointer == windowDataA) {
windowDataA.writing();
return windowDataA;
@@ -67,7 +67,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
}
}
- private WINDOW_COLLECTION getCurrent() {
+ private SWCollection<DATA> getCurrent() {
return pointer;
}
@@ -75,7 +75,7 @@ public abstract class Window<WINDOW_COLLECTION extends Collection> {
return getCurrent().size();
}
- public WINDOW_COLLECTION getLast() {
+ public SWCollection<DATA> getLast() {
if (pointer == windowDataA) {
return windowDataB;
} else {
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
new file mode 100644
index 0000000..9a4c51f
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentDispatcher.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
+import org.apache.skywalking.oap.server.core.source.Segment;
+
+/**
+ * @author peng-yongsheng
+ */
+public class SegmentDispatcher implements SourceDispatcher<Segment> {
+
+ @Override public void dispatch(Segment source) {
+ SegmentRecord segment = new SegmentRecord();
+ segment.setSegmentId(source.getSegmentId());
+ segment.setTraceId(source.getTraceId());
+ segment.setServiceId(source.getServiceId());
+ segment.setEndpointName(source.getEndpointName());
+ segment.setStartTime(source.getStartTime());
+ segment.setEndTime(source.getEndTime());
+ segment.setLatency(source.getLatency());
+ segment.setIsError(source.getIsError());
+ segment.setDataBinary(source.getDataBinary());
+ segment.setTimeBucket(source.getTimeBucket());
+
+ RecordProcess.INSTANCE.in(segment);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
new file mode 100644
index 0000000..ba5fc88
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/segment/SegmentRecord.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.segment;
+
+import java.util.*;
+import lombok.*;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordType;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.*;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+/**
+ * @author peng-yongsheng
+ */
+@RecordType
+@StorageEntity(name = SegmentRecord.INDEX_NAME, builder = SegmentRecord.Builder.class)
+public class SegmentRecord extends Record {
+
+ public static final String INDEX_NAME = "segment";
+ public static final String SEGMENT_ID = "segment_id";
+ public static final String TRACE_ID = "trace_id";
+ public static final String SERVICE_ID = "service_id";
+ public static final String ENDPOINT_NAME = "endpoint_name";
+ public static final String START_TIME = "start_time";
+ public static final String END_TIME = "end_time";
+ public static final String LATENCY = "latency";
+ public static final String IS_ERROR = "is_error";
+ public static final String DATA_BINARY = "data_binary";
+
+ @Setter @Getter @Column(columnName = SEGMENT_ID) @IDColumn private String segmentId;
+ @Setter @Getter @Column(columnName = TRACE_ID) @IDColumn private String traceId;
+ @Setter @Getter @Column(columnName = SERVICE_ID) @IDColumn private int serviceId;
+ @Setter @Getter @Column(columnName = ENDPOINT_NAME) @IDColumn private String endpointName;
+ @Setter @Getter @Column(columnName = START_TIME) @IDColumn private long startTime;
+ @Setter @Getter @Column(columnName = END_TIME) @IDColumn private long endTime;
+ @Setter @Getter @Column(columnName = LATENCY) @IDColumn private int latency;
+ @Setter @Getter @Column(columnName = IS_ERROR) @IDColumn private int isError;
+ @Setter @Getter @Column(columnName = DATA_BINARY) @IDColumn private byte[] dataBinary;
+
+ @Override public String id() {
+ return segmentId;
+ }
+
+ public static class Builder implements StorageBuilder<SegmentRecord> {
+
+ @Override public Map<String, Object> data2Map(SegmentRecord storageData) {
+ Map<String, Object> map = new HashMap<>();
+ map.put(SEGMENT_ID, storageData.getSegmentId());
+ map.put(TRACE_ID, storageData.getTraceId());
+ map.put(SERVICE_ID, storageData.getServiceId());
+ map.put(ENDPOINT_NAME, storageData.getEndpointName());
+ map.put(START_TIME, storageData.getStartTime());
+ map.put(END_TIME, storageData.getEndTime());
+ map.put(LATENCY, storageData.getLatency());
+ map.put(IS_ERROR, storageData.getIsError());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ if (CollectionUtils.isEmpty(storageData.getDataBinary())) {
+ map.put(DATA_BINARY, Const.EMPTY_STRING);
+ } else {
+ map.put(DATA_BINARY, new String(Base64.getEncoder().encode(storageData.getDataBinary())));
+ }
+ return map;
+ }
+
+ @Override public SegmentRecord map2Data(Map<String, Object> dbMap) {
+ SegmentRecord record = new SegmentRecord();
+ record.setSegmentId((String)dbMap.get(SEGMENT_ID));
+ record.setTraceId((String)dbMap.get(TRACE_ID));
+ record.setServiceId(((Number)dbMap.get(SERVICE_ID)).intValue());
+ record.setEndpointName((String)dbMap.get(ENDPOINT_NAME));
+ record.setStartTime(((Number)dbMap.get(START_TIME)).longValue());
+ record.setEndTime(((Number)dbMap.get(END_TIME)).longValue());
+ record.setLatency(((Number)dbMap.get(LATENCY)).intValue());
+ record.setIsError(((Number)dbMap.get(IS_ERROR)).intValue());
+ record.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).longValue());
+ if (StringUtil.isEmpty((String)dbMap.get(DATA_BINARY))) {
+ record.setDataBinary(new byte[] {});
+ } else {
+ record.setDataBinary(Base64.getDecoder().decode((String)dbMap.get(DATA_BINARY)));
+ }
+ return record;
+ }
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java
similarity index 67%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java
index f1d52a8..0973a63 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/Record.java
@@ -16,28 +16,18 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.analysis.record;
+
+import lombok.*;
+import org.apache.skywalking.oap.server.core.storage.StorageData;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
/**
* @author peng-yongsheng
*/
-public interface Collection<Data> {
-
- void reading();
-
- boolean isReading();
-
- void writing();
-
- boolean isWriting();
-
- void clear();
-
- int size();
-
- void finishReading();
+public abstract class Record implements StorageData {
- void finishWriting();
+ public static final String TIME_BUCKET = "time_bucket";
- Data collection();
+ @Getter @Setter @Column(columnName = TIME_BUCKET) private long timeBucket;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordType.java
similarity index 77%
copy from oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordType.java
index ce70081..880477c 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordType.java
@@ -16,17 +16,14 @@
*
*/
-package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener;
+package org.apache.skywalking.oap.server.core.analysis.record.annotation;
+
+import java.lang.annotation.*;
/**
* @author peng-yongsheng
*/
-public interface SpanListener {
- void build();
-
- boolean containsPoint(Point point);
-
- enum Point {
- Entry, Exit, Local, First, GlobalTraceIds
- }
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+public @interface RecordType {
}
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordTypeListener.java
similarity index 53%
copy from oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordTypeListener.java
index 8faaffa..6eb0d07 100644
--- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/record/annotation/RecordTypeListener.java
@@ -16,40 +16,29 @@
*
*/
-package org.apache.skywalking.oap.server.library.util;
+package org.apache.skywalking.oap.server.core.analysis.record.annotation;
-import java.util.*;
+import java.lang.annotation.Annotation;
+import org.apache.skywalking.oap.server.core.analysis.worker.RecordProcess;
+import org.apache.skywalking.oap.server.core.annotation.AnnotationListener;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author peng-yongsheng
*/
-public class CollectionUtils {
+public class RecordTypeListener implements AnnotationListener {
- public static boolean isEmpty(Map map) {
- return map == null || map.size() == 0;
- }
-
- public static boolean isEmpty(List list) {
- return list == null || list.size() == 0;
- }
-
- public static boolean isEmpty(Set set) {
- return set == null || set.size() == 0;
- }
-
- public static boolean isNotEmpty(List list) {
- return !isEmpty(list);
- }
+ private final ModuleManager moduleManager;
- public static boolean isNotEmpty(Set set) {
- return !isEmpty(set);
+ public RecordTypeListener(ModuleManager moduleManager) {
+ this.moduleManager = moduleManager;
}
- public static boolean isNotEmpty(Map map) {
- return !isEmpty(map);
+ @Override public Class<? extends Annotation> annotation() {
+ return RecordType.class;
}
- public static <T> boolean isNotEmpty(T[] array) {
- return array != null && array.length > 0;
+ @Override public void notify(Class aClass) {
+ RecordProcess.INSTANCE.create(moduleManager, aClass);
}
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
index 8d0bf88..15ff40b 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorAggregateWorker.java
@@ -71,7 +71,7 @@ public class IndicatorAggregateWorker extends AbstractWorker<Indicator> {
}
}
- mergeDataCache.getLast().collection().forEach((Indicator key, Indicator data) -> {
+ mergeDataCache.getLast().collection().forEach(data -> {
if (logger.isDebugEnabled()) {
logger.debug(data.toString());
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
index bca9752..6a614a0 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
@@ -19,9 +19,9 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
+import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
@@ -31,48 +31,28 @@ import static java.util.Objects.nonNull;
/**
* @author peng-yongsheng
*/
-public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
+public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, MergeDataCache<Indicator>> {
private static final Logger logger = LoggerFactory.getLogger(IndicatorPersistentWorker.class);
private final String modelName;
private final MergeDataCache<Indicator> mergeDataCache;
- private final IBatchDAO batchDAO;
private final IIndicatorDAO indicatorDAO;
- private final int blockBatchPersistenceSize;
private final AbstractWorker<Indicator> nextWorker;
IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
- super(workerId);
+ super(moduleManager, workerId, batchSize);
this.modelName = modelName;
- this.blockBatchPersistenceSize = batchSize;
this.mergeDataCache = new MergeDataCache<>();
- this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
}
- public final Window<MergeDataCollection<Indicator>> getCache() {
+ @Override public MergeDataCache<Indicator> getCache() {
return mergeDataCache;
}
- @Override public final void in(Indicator input) {
- if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
- try {
- if (getCache().trySwitchPointer()) {
- getCache().switchPointer();
-
- List<?> collection = buildBatchCollection();
- batchDAO.batchPersistence(collection);
- }
- } finally {
- getCache().trySwitchPointerFinally();
- }
- }
- cacheData(input);
- }
-
public boolean flushAndSwitch() {
boolean isSwitch;
try {
@@ -85,29 +65,9 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
return isSwitch;
}
- public final List<?> buildBatchCollection() {
- List<?> batchCollection = new LinkedList<>();
- try {
- while (getCache().getLast().isWriting()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- logger.warn("thread wake up");
- }
- }
-
- if (getCache().getLast().collection() != null) {
- batchCollection = prepareBatch(getCache().getLast());
- }
- } finally {
- getCache().finishReadingLast();
- }
- return batchCollection;
- }
-
- private List<Object> prepareBatch(MergeDataCollection<Indicator> collection) {
+ @Override public List<Object> prepareBatch(MergeDataCache<Indicator> cache) {
List<Object> batchCollection = new LinkedList<>();
- collection.collection().forEach((id, data) -> {
+ cache.getLast().collection().forEach(data -> {
Indicator dbData = null;
try {
dbData = indicatorDAO.get(modelName, data);
@@ -131,7 +91,7 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
return batchCollection;
}
- private void cacheData(Indicator input) {
+ @Override public void cacheData(Indicator input) {
mergeDataCache.writing();
if (mergeDataCache.containsKey(input)) {
Indicator indicator = mergeDataCache.get(input);
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
similarity index 50%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
index bca9752..d731c73 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/IndicatorPersistentWorker.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/PersistenceWorker.java
@@ -19,46 +19,30 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.*;
-import org.apache.skywalking.oap.server.core.analysis.data.*;
-import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.data.Window;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
-import static java.util.Objects.nonNull;
-
/**
* @author peng-yongsheng
*/
-public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
+public abstract class PersistenceWorker<INPUT extends StorageData, CACHE extends Window<INPUT>> extends AbstractWorker<INPUT> {
- private static final Logger logger = LoggerFactory.getLogger(IndicatorPersistentWorker.class);
+ private static final Logger logger = LoggerFactory.getLogger(PersistenceWorker.class);
- private final String modelName;
- private final MergeDataCache<Indicator> mergeDataCache;
+ private final int batchSize;
private final IBatchDAO batchDAO;
- private final IIndicatorDAO indicatorDAO;
- private final int blockBatchPersistenceSize;
- private final AbstractWorker<Indicator> nextWorker;
- IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
- IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
+ PersistenceWorker(ModuleManager moduleManager, int workerId, int batchSize) {
super(workerId);
- this.modelName = modelName;
- this.blockBatchPersistenceSize = batchSize;
- this.mergeDataCache = new MergeDataCache<>();
+ this.batchSize = batchSize;
this.batchDAO = moduleManager.find(StorageModule.NAME).getService(IBatchDAO.class);
- this.indicatorDAO = indicatorDAO;
- this.nextWorker = nextWorker;
- }
-
- public final Window<MergeDataCollection<Indicator>> getCache() {
- return mergeDataCache;
}
- @Override public final void in(Indicator input) {
- if (getCache().currentCollectionSize() >= blockBatchPersistenceSize) {
+ @Override public final void in(INPUT input) {
+ if (getCache().currentCollectionSize() >= batchSize) {
try {
if (getCache().trySwitchPointer()) {
getCache().switchPointer();
@@ -73,6 +57,10 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
cacheData(input);
}
+ public abstract void cacheData(INPUT input);
+
+ public abstract CACHE getCache();
+
public boolean flushAndSwitch() {
boolean isSwitch;
try {
@@ -85,6 +73,8 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
return isSwitch;
}
+ public abstract List<Object> prepareBatch(CACHE cache);
+
public final List<?> buildBatchCollection() {
List<?> batchCollection = new LinkedList<>();
try {
@@ -97,51 +87,11 @@ public class IndicatorPersistentWorker extends AbstractWorker<Indicator> {
}
if (getCache().getLast().collection() != null) {
- batchCollection = prepareBatch(getCache().getLast());
+ batchCollection = prepareBatch(getCache());
}
} finally {
getCache().finishReadingLast();
}
return batchCollection;
}
-
- private List<Object> prepareBatch(MergeDataCollection<Indicator> collection) {
- List<Object> batchCollection = new LinkedList<>();
- collection.collection().forEach((id, data) -> {
- Indicator dbData = null;
- try {
- dbData = indicatorDAO.get(modelName, data);
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
- try {
- if (nonNull(dbData)) {
- data.combine(dbData);
- batchCollection.add(indicatorDAO.prepareBatchUpdate(modelName, data));
- } else {
- batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
- }
-
- nextWorker.in(data);
- } catch (Throwable t) {
- logger.error(t.getMessage(), t);
- }
- });
-
- return batchCollection;
- }
-
- private void cacheData(Indicator input) {
- mergeDataCache.writing();
- if (mergeDataCache.containsKey(input)) {
- Indicator indicator = mergeDataCache.get(input);
- indicator.combine(input);
- indicator.calculate();
- } else {
- input.calculate();
- mergeDataCache.put(input);
- }
-
- mergeDataCache.finishWriting();
- }
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
new file mode 100644
index 0000000..ea8a47d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordPersistentWorker.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import org.apache.skywalking.oap.server.core.analysis.data.NonMergeDataCache;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.IRecordDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RecordPersistentWorker extends PersistenceWorker<Record, NonMergeDataCache<Record>> {
+
+ private static final Logger logger = LoggerFactory.getLogger(RecordPersistentWorker.class);
+
+ private final String modelName;
+ private final NonMergeDataCache<Record> nonMergeDataCache;
+ private final IRecordDAO recordDAO;
+
+ RecordPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
+ IRecordDAO recordDAO) {
+ super(moduleManager, workerId, batchSize);
+ this.modelName = modelName;
+ this.nonMergeDataCache = new NonMergeDataCache<>();
+ this.recordDAO = recordDAO;
+ }
+
+ @Override public NonMergeDataCache<Record> getCache() {
+ return nonMergeDataCache;
+ }
+
+ @Override public List<Object> prepareBatch(NonMergeDataCache<Record> cache) {
+ List<Object> batchCollection = new LinkedList<>();
+ cache.getLast().collection().forEach(record -> {
+ try {
+ batchCollection.add(recordDAO.prepareBatchInsert(modelName, record));
+ } catch (Throwable t) {
+ logger.error(t.getMessage(), t);
+ }
+ });
+ return batchCollection;
+ }
+
+ @Override public void cacheData(Record input) {
+ nonMergeDataCache.writing();
+ nonMergeDataCache.add(input);
+ nonMergeDataCache.finishWriting();
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
new file mode 100644
index 0000000..838136d
--- /dev/null
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/worker/RecordProcess.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.worker;
+
+import java.util.*;
+import lombok.Getter;
+import org.apache.skywalking.oap.server.core.UnexpectedException;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.annotation.StorageEntityAnnotationUtils;
+import org.apache.skywalking.oap.server.core.worker.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+
+/**
+ * @author peng-yongsheng
+ */
+public enum RecordProcess {
+ INSTANCE;
+
+ private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();
+
+ public void in(Record record) {
+ workers.get(record.getClass()).in(record);
+ }
+
+ @Getter private List<RecordPersistentWorker> persistentWorkers = new ArrayList<>();
+
+ public void create(ModuleManager moduleManager, Class<? extends Record> recordClass) {
+ String modelName = StorageEntityAnnotationUtils.getModelName(recordClass);
+ Class<? extends StorageBuilder> builderClass = StorageEntityAnnotationUtils.getBuilder(recordClass);
+
+ StorageDAO storageDAO = moduleManager.find(StorageModule.NAME).getService(StorageDAO.class);
+ IRecordDAO recordDAO;
+ try {
+ recordDAO = storageDAO.newRecordDao(builderClass.newInstance());
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new UnexpectedException("");
+ }
+
+ RecordPersistentWorker persistentWorker = new RecordPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
+ 1000, moduleManager, recordDAO);
+ WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
+ persistentWorkers.add(persistentWorker);
+ workers.put(recordClass, persistentWorker);
+ }
+}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
index 396ecab..e77d036 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
@@ -24,5 +24,6 @@ package org.apache.skywalking.oap.server.core.source;
public enum Scope {
All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
- ServiceComponent, ServiceMapping
+ ServiceComponent, ServiceMapping,
+ Segment
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
similarity index 56%
copy from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
copy to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
index 396ecab..b669171 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Scope.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/Segment.java
@@ -18,11 +18,30 @@
package org.apache.skywalking.oap.server.core.source;
+import lombok.*;
+import org.apache.skywalking.oap.server.core.source.annotation.SourceType;
+
/**
* @author peng-yongsheng
*/
-public enum Scope {
- All, Service, ServiceInstance, Endpoint, ServiceRelation, ServiceInstanceRelation, EndpointRelation, NetworkAddress,
- ServiceInstanceJVMCPU, ServiceInstanceJVMMemory, ServiceInstanceJVMMemoryPool, ServiceInstanceJVMGC,
- ServiceComponent, ServiceMapping
+@SourceType
+public class Segment extends Source {
+
+ @Override public Scope scope() {
+ return Scope.Segment;
+ }
+
+ @Override public String getEntityId() {
+ return segmentId;
+ }
+
+ @Setter @Getter private String segmentId;
+ @Setter @Getter private String traceId;
+ @Setter @Getter private int serviceId;
+ @Setter @Getter private String endpointName;
+ @Setter @Getter private long startTime;
+ @Setter @Getter private long endTime;
+ @Setter @Getter private int latency;
+ @Setter @Getter private int isError;
+ @Setter @Getter private byte[] dataBinary;
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
similarity index 70%
rename from oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
rename to oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
index f1d52a8..d5e156f 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/data/Collection.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/IRecordDAO.java
@@ -16,28 +16,17 @@
*
*/
-package org.apache.skywalking.oap.server.core.analysis.data;
+package org.apache.skywalking.oap.server.core.storage;
+
+import java.io.IOException;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
/**
* @author peng-yongsheng
*/
-public interface Collection<Data> {
-
- void reading();
-
- boolean isReading();
-
- void writing();
-
- boolean isWriting();
-
- void clear();
-
- int size();
-
- void finishReading();
+public interface IRecordDAO<INSERT> extends DAO {
- void finishWriting();
+ INSERT prepareBatchInsert(String modelName, Record record) throws IOException;
- Data collection();
+ void deleteHistory(String modelName, Long timeBucketBefore);
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
index 7271f04..4d8ebd7 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/PersistenceTimer.java
@@ -21,7 +21,7 @@ package org.apache.skywalking.oap.server.core.storage;
import java.util.*;
import java.util.concurrent.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.oap.server.core.analysis.worker.IndicatorProcess;
+import org.apache.skywalking.oap.server.core.analysis.worker.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.*;
@@ -65,7 +65,11 @@ public enum PersistenceTimer {
long startTime = System.currentTimeMillis();
try {
List batchAllCollection = new LinkedList();
- IndicatorProcess.INSTANCE.getPersistentWorkers().forEach(worker -> {
+ List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
+ persistenceWorkers.addAll(IndicatorProcess.INSTANCE.getPersistentWorkers());
+ persistenceWorkers.addAll(RecordProcess.INSTANCE.getPersistentWorkers());
+
+ persistenceWorkers.forEach(worker -> {
if (logger.isDebugEnabled()) {
logger.debug("extract {} worker data and save", worker.getClass().getName());
}
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
index 187cd78..ddb348e 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -30,4 +31,6 @@ public interface StorageDAO extends Service {
IIndicatorDAO newIndicatorDao(StorageBuilder<Indicator> storageBuilder);
IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder);
+
+ IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder);
}
diff --git a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
index 8faaffa..c3b657c 100644
--- a/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
+++ b/oap-server/server-library/library-util/src/main/java/org/apache/skywalking/oap/server/library/util/CollectionUtils.java
@@ -52,4 +52,12 @@ public class CollectionUtils {
public static <T> boolean isNotEmpty(T[] array) {
return array != null && array.length > 0;
}
+
+ public static boolean isEmpty(byte[] array) {
+ return array == null || array.length == 0;
+ }
+
+ public static boolean isNotEmpty(byte[] array) {
+ return !isEmpty(array);
+ }
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index 266e85d..83065ea 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -26,6 +26,7 @@ import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
import org.apache.skywalking.oap.server.receiver.trace.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.endpoint.MultiScopesSpanListener;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment.SegmentSpanListener;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.service.*;
import org.apache.skywalking.oap.server.receiver.trace.provider.parser.standardization.SegmentStandardizationWorker;
@@ -60,6 +61,7 @@ public class TraceModuleProvider extends ModuleProvider {
listenerManager.add(new MultiScopesSpanListener.Factory());
listenerManager.add(new ServiceComponentSpanListener.Factory());
listenerManager.add(new ServiceMappingSpanListener.Factory());
+ listenerManager.add(new SegmentSpanListener.Factory());
GRPCHandlerRegister grpcHandlerRegister = getManager().find(CoreModule.NAME).getService(GRPCHandlerRegister.class);
try {
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
index 78c6a78..d07c4a5 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/SegmentParse.java
@@ -78,9 +78,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
if (logger.isDebugEnabled()) {
logger.debug("This segment id exchange success, id: {}", segmentCoreInfo.getSegmentId());
}
-
notifyListenerToBuild();
- buildSegment(segmentCoreInfo.getSegmentId(), segmentDecorator.toByteArray());
return true;
}
} catch (Throwable e) {
@@ -111,6 +109,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
segmentCoreInfo.setSegmentId(segmentIdBuilder.toString());
segmentCoreInfo.setApplicationId(segmentDecorator.getApplicationId());
segmentCoreInfo.setApplicationInstanceId(segmentDecorator.getApplicationInstanceId());
+ segmentCoreInfo.setDataBinary(segmentDecorator.toByteArray());
for (int i = 0; i < segmentDecorator.getSpansCount(); i++) {
SpanDecorator spanDecorator = segmentDecorator.getSpans(i);
@@ -159,13 +158,6 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
return true;
}
- private void buildSegment(String id, byte[] dataBinary) {
-// Segment segment = new Segment();
-// segment.setId(id);
-// segment.setDataBinary(dataBinary);
-// segment.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
- }
-
private void writeToBufferFile(String id, UpstreamSegment upstreamSegment) {
if (logger.isDebugEnabled()) {
logger.debug("push to segment buffer write worker, id: {}", id);
@@ -215,7 +207,7 @@ public class SegmentParse implements DataStreamReader.CallBack<UpstreamSegment>
private void notifyGlobalsListener(UniqueId uniqueId) {
spanListeners.forEach(listener -> {
- if (listener.containsPoint(SpanListener.Point.GlobalTraceIds)) {
+ if (listener.containsPoint(SpanListener.Point.TraceIds)) {
((GlobalTraceIdsListener)listener).parseGlobalTraceId(uniqueId, segmentCoreInfo);
}
});
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
index 78817ea..a0c0b7f 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/decorator/SegmentCoreInfo.java
@@ -18,80 +18,20 @@
package org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator;
+import lombok.*;
+
/**
* @author peng-yongsheng
*/
+@Getter
+@Setter
public class SegmentCoreInfo {
private String segmentId;
- private String traceId;
private int applicationId;
private int applicationInstanceId;
private long startTime;
private long endTime;
private boolean isError;
private long minuteTimeBucket;
-
- public String getSegmentId() {
- return segmentId;
- }
-
- public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
- }
-
- public int getApplicationId() {
- return applicationId;
- }
-
- public void setApplicationId(int applicationId) {
- this.applicationId = applicationId;
- }
-
- public int getApplicationInstanceId() {
- return applicationInstanceId;
- }
-
- public void setApplicationInstanceId(int applicationInstanceId) {
- this.applicationInstanceId = applicationInstanceId;
- }
-
- public long getStartTime() {
- return startTime;
- }
-
- public void setStartTime(long startTime) {
- this.startTime = startTime;
- }
-
- public long getEndTime() {
- return endTime;
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- public boolean isError() {
- return isError;
- }
-
- public void setError(boolean error) {
- isError = error;
- }
-
- public long getMinuteTimeBucket() {
- return minuteTimeBucket;
- }
-
- public void setMinuteTimeBucket(long minuteTimeBucket) {
- this.minuteTimeBucket = minuteTimeBucket;
- }
-
- public String getTraceId() {
- return traceId;
- }
-
- public void setTraceId(String traceId) {
- this.traceId = traceId;
- }
+ private byte[] dataBinary;
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
index ce70081..08e3b80 100644
--- a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/SpanListener.java
@@ -27,6 +27,6 @@ public interface SpanListener {
boolean containsPoint(Point point);
enum Point {
- Entry, Exit, Local, First, GlobalTraceIds
+ Entry, Exit, Local, First, TraceIds
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
new file mode 100644
index 0000000..d3dceb8
--- /dev/null
+++ b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/parser/listener/segment/SegmentSpanListener.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.segment;
+
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
+import org.apache.skywalking.oap.server.core.CoreModule;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.decorator.*;
+import org.apache.skywalking.oap.server.receiver.trace.provider.parser.listener.*;
+import org.slf4j.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class SegmentSpanListener implements FirstSpanListener, EntrySpanListener, GlobalTraceIdsListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(SegmentSpanListener.class);
+
+ private final SourceReceiver sourceReceiver;
+ private final Segment segment = new Segment();
+ private final EndpointInventoryCache serviceNameCacheService;
+ private int entryEndpointId = 0;
+ private int firstEndpointId = 0;
+
+ private SegmentSpanListener(ModuleManager moduleManager) {
+ this.sourceReceiver = moduleManager.find(CoreModule.NAME).getService(SourceReceiver.class);
+ this.serviceNameCacheService = moduleManager.find(CoreModule.NAME).getService(EndpointInventoryCache.class);
+ }
+
+ @Override public boolean containsPoint(Point point) {
+ return Point.First.equals(point) || Point.Entry.equals(point) || Point.TraceIds.equals(point);
+ }
+
+ @Override
+ public void parseFirst(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
+ long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(segmentCoreInfo.getStartTime());
+
+ segment.setSegmentId(segmentCoreInfo.getSegmentId());
+ segment.setSegmentId(segmentCoreInfo.getSegmentId());
+ segment.setServiceId(segmentCoreInfo.getApplicationId());
+ segment.setLatency((int)(segmentCoreInfo.getEndTime() - segmentCoreInfo.getStartTime()));
+ segment.setStartTime(segmentCoreInfo.getStartTime());
+ segment.setEndTime(segmentCoreInfo.getEndTime());
+ segment.setIsError(BooleanUtils.booleanToValue(segmentCoreInfo.isError()));
+ segment.setTimeBucket(timeBucket);
+ segment.setDataBinary(segmentCoreInfo.getDataBinary());
+
+ firstEndpointId = spanDecorator.getOperationNameId();
+ }
+
+ @Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
+ entryEndpointId = spanDecorator.getOperationNameId();
+ }
+
+ @Override public void parseGlobalTraceId(UniqueId uniqueId, SegmentCoreInfo segmentCoreInfo) {
+ StringBuilder traceIdBuilder = new StringBuilder();
+ for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
+ if (i == 0) {
+ traceIdBuilder.append(uniqueId.getIdPartsList().get(i));
+ } else {
+ traceIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i));
+ }
+ }
+ segment.setTraceId(traceIdBuilder.toString());
+ }
+
+ @Override public void build() {
+ if (logger.isDebugEnabled()) {
+ logger.debug("segment duration listener build");
+ }
+
+ if (entryEndpointId == 0) {
+ segment.setEndpointName(serviceNameCacheService.get(firstEndpointId).getName());
+ } else {
+ segment.setEndpointName(serviceNameCacheService.get(entryEndpointId).getName());
+ }
+
+ sourceReceiver.receive(segment);
+ }
+
+ public static class Factory implements SpanListenerFactory {
+
+ @Override public SpanListener create(ModuleManager moduleManager) {
+ return new SegmentSpanListener(moduleManager);
+ }
+ }
+}
diff --git a/oap-server/server-starter/src/main/resources/log4j2.xml b/oap-server/server-starter/src/main/resources/log4j2.xml
index 2da2d13..a6ce5a2 100644
--- a/oap-server/server-starter/src/main/resources/log4j2.xml
+++ b/oap-server/server-starter/src/main/resources/log4j2.xml
@@ -32,8 +32,8 @@
<logger name="io.netty" level="INFO"/>
<logger name="org.apache.http" level="INFO"/>
<logger name="org.apache.skywalking.oap.server.receiver.istio.telemetry" level="DEBUG"/>
- <logger name="org.apache.skywalking.oap.server.core.remote" level="DEBUG"/>
- <Root level="DEBUG">
+ <logger name="org.apache.skywalking.oap.server.core.remote" level="INFO"/>
+ <Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
index 37c3db8..9df2148 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/ColumnTypeEsMapping.java
@@ -37,6 +37,8 @@ public class ColumnTypeEsMapping implements DataTypeMapping {
return "keyword";
} else if (IntKeyLongValueArray.class.equals(type)) {
return "keyword";
+ } else if (byte[].class.equals(type)) {
+ return "binary";
} else {
throw new IllegalArgumentException("Unsupported data type: " + type.getName());
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
new file mode 100644
index 0000000..bec4cab
--- /dev/null
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/RecordEsDAO.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.type.StorageDataType;
+import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.xcontent.*;
+
+/**
+ * @author peng-yongsheng
+ */
+public class RecordEsDAO extends EsDAO implements IRecordDAO<IndexRequest> {
+
+ private final StorageBuilder<Record> storageBuilder;
+
+ public RecordEsDAO(ElasticSearchClient client, StorageBuilder<Record> storageBuilder) {
+ super(client);
+ this.storageBuilder = storageBuilder;
+ }
+
+ @Override public IndexRequest prepareBatchInsert(String modelName, Record record) throws IOException {
+ Map<String, Object> objectMap = storageBuilder.data2Map(record);
+
+ XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
+ for (String key : objectMap.keySet()) {
+ Object value = objectMap.get(key);
+ if (value instanceof StorageDataType) {
+ builder.field(key, ((StorageDataType)value).toStorageData());
+ } else {
+ builder.field(key, value);
+ }
+ }
+ builder.endObject();
+ return getClient().prepareInsert(modelName, record.id(), builder);
+ }
+
+ @Override public void deleteHistory(String modelName, Long timeBucketBefore) {
+ }
+}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
index 44c9df0..3774d65 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsDAO.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
import org.apache.skywalking.oap.server.core.register.RegisterSource;
import org.apache.skywalking.oap.server.core.storage.*;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
@@ -39,4 +40,8 @@ public class StorageEsDAO extends EsDAO implements StorageDAO {
@Override public IRegisterDAO newRegisterDao(StorageBuilder<RegisterSource> storageBuilder) {
return new RegisterEsDAO(getClient(), storageBuilder);
}
+
+ @Override public IRecordDAO newRecordDao(StorageBuilder<Record> storageBuilder) {
+ return new RecordEsDAO(getClient(), storageBuilder);
+ }
}
diff --git a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
index 95fb094..19e616a 100644
--- a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
+++ b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/base/StorageEsInstaller.java
@@ -19,11 +19,12 @@
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base;
import java.io.IOException;
-import org.apache.skywalking.oap.server.core.storage.*;
+import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.core.storage.model.*;
import org.apache.skywalking.oap.server.library.client.Client;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.*;
import org.slf4j.*;
@@ -80,9 +81,9 @@ public class StorageEsInstaller extends ModelInstaller {
Settings settings = createSettingBuilder();
try {
mappingBuilder = createMappingBuilder(tableDefine);
- logger.info("mapping builder str: {}", mappingBuilder.prettyPrint());
+ logger.info("index {}'s mapping builder str: {}", tableDefine.getName(), Strings.toString(mappingBuilder.prettyPrint()));
} catch (Exception e) {
- logger.error("create {} index mapping builder error", tableDefine.getName());
+ logger.error("create {} index mapping builder error, error message: {}", tableDefine.getName(), e.getMessage());
}
boolean isAcknowledged;