You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ad...@apache.org on 2020/03/27 05:50:38 UTC
[skywalking] branch master updated: fix elasticsearch-5.x-plugin
when use es6.x TransportClient error (#4517)
This is an automated email from the ASF dual-hosted git repository.
aderm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 1bc4082 fix elasticsearch-5.x-plugin when use es6.x TransportClient error (#4517)
1bc4082 is described below
commit 1bc408282176363e6d98b3969be12810dfcded72
Author: yi.liang <in...@126.com>
AuthorDate: Fri Mar 27 00:50:27 2020 -0500
fix elasticsearch-5.x-plugin when use es6.x TransportClient error (#4517)
* up ui
* up ui
* fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected
up elasticsearch-6.x-plugin support TransportClient
* fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected
up elasticsearch-6.x-plugin support TransportClient
* fix elasticsearch-5.x-plugin on es6.x TransportClient error, Found class org.elasticsearch.common.transport.TransportAddress, but interface was expected
up elasticsearch-6.x-plugin support TransportClient
* const string witnesses
add some unit test
recommended to change ; to ,
* const string witnesses
add some unit test
recommended to change ; to ,
* up ES6.X test-plugin
* up es6.x test-plugin
* up es6.x test-plugin
* up es6.x test-plugin
* add DeleteIndexRequest
* up es6.x test-plugin
* remove TransportCaseController.java
* fix TransportActionNodeProxyExecuteMethodsInterceptorTest
Co-authored-by: yi.liang <yi...@zhangmen.cn>
Co-authored-by: 吴晟 Wu Sheng <wu...@foxmail.com>
Co-authored-by: 梁懿 <li...@xforceplus.com>
Co-authored-by: aderm <39...@qq.com>
---
.../skywalking/apm/agent/core/conf/Config.java | 2 +
.../apm/plugin/elasticsearch/v5/Constants.java | 4 +-
.../v5/define/GenericActionInstrumentation.java | 7 +
...PlainListenableActionFutureInstrumentation.java | 7 +
.../TransportActionNodeProxyInstrumentation.java | 7 +
...TransportClientNodesServiceInstrumentation.java | 7 +
.../TransportProxyClientInstrumentation.java | 7 +
.../elasticsearch/v6/TransportAddressCache.java | 61 +++++
.../v6/TransportClientEnhanceInfo.java | 48 ++++
.../AdapterActionFutureInstrumentation.java} | 16 +-
.../TransportActionNodeProxyInstrumentation.java | 20 +-
...TransportClientNodesServiceInstrumentation.java | 72 ++++--
.../define/TransportServiceInstrumentation.java} | 14 +-
...terActionFutureActionGetMethodsInterceptor.java | 167 ++++++++++++++
.../elasticsearch/v6/interceptor/Constants.java | 16 ++
...rtActionNodeProxyExecuteMethodsInterceptor.java | 160 +++++++++++++
.../TransportClientNodesServiceInterceptor.java | 117 ++++++++++
.../TransportServiceConInterceptor.java | 40 ++++
.../src/main/resources/skywalking-plugin.def | 4 +
...ctionFutureActionGetMethodsInterceptorTest.java | 142 ++++++++++++
...tionNodeProxyExecuteMethodsInterceptorTest.java | 248 +++++++++++++++++++++
.../v6/interceptor/TransportAddressCacheTest.java | 54 +++++
.../TransportServiceConInterceptorTest.java | 82 +++++++
.../service-agent/java-agent/Supported-list.md | 3 +-
.../config/expectedData.yaml | 136 ++++++++++-
.../scenarios/elasticsearch-6.x-scenario/pom.xml | 8 +-
...ontroller.java => RestHighLevelClientCase.java} | 31 ++-
.../elasticsearch/TransportClientCase.java | 101 +++++++++
.../config/TransportClientConfig.java | 72 ++++++
.../elasticsearch/controller/CaseController.java | 176 +--------------
30 files changed, 1600 insertions(+), 229 deletions(-)
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index e518175..726aa4a 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -256,6 +256,8 @@ public class Config {
* If true, trace all the DSL(Domain Specific Language) in ElasticSearch access, default is false.
*/
public static boolean TRACE_DSL = false;
+
+ public static int ELASTICSEARCH_DSL_LENGTH_THRESHOLD = 1024;
}
public static class Customize {
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java
index dad1fb8..eed12f3 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/Constants.java
@@ -21,7 +21,9 @@ package org.apache.skywalking.apm.plugin.elasticsearch.v5;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.agent.core.context.tag.Tags;
-class Constants {
+public class Constants {
+
+ public static final String INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS = "org.elasticsearch.common.transport.InetSocketTransportAddress";
static final String DB_TYPE = "Elasticsearch";
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java
index ec4a957..cc1adf3 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/GenericActionInstrumentation.java
@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.apache.skywalking.apm.agent.core.plugin.match.HierarchyMatch.byHierarchyMatch;
@@ -60,4 +61,10 @@ public class GenericActionInstrumentation extends ClassEnhancePluginDefine {
protected ClassMatch enhanceClass() {
return byHierarchyMatch(new String[] {"org.elasticsearch.action.GenericAction"});
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
+ }
+
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java
index a653f81..a97befa 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
@@ -67,4 +68,10 @@ public class PlainListenableActionFutureInstrumentation extends ClassEnhancePlug
protected ClassMatch enhanceClass() {
return byName("org.elasticsearch.action.support.PlainListenableActionFuture");
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
+ }
+
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java
index d98b457..55526f6 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -83,4 +84,10 @@ public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginD
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
+ }
+
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java
index df10273..6866e69 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
@@ -80,4 +81,10 @@ public class TransportClientNodesServiceInstrumentation extends ClassInstanceMet
protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
+ }
+
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java
index 55e6e91..d4c0c3f 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v5.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
@@ -64,4 +65,10 @@ public class TransportProxyClientInstrumentation extends ClassInstanceMethodsEnh
protected ClassMatch enhanceClass() {
return byName("org.elasticsearch.client.transport.TransportProxyClient");
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.INET_SOCKET_TRANSPORT_ADDRESS_WITNESS_CLASS};
+ }
+
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java
new file mode 100644
index 0000000..e61287c
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportAddressCache.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6;
+
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class TransportAddressCache {
+
+ private List<TransportAddress> transportAddresses = new ArrayList<TransportAddress>();
+ private String transportAddressesStr = "";
+
+ public synchronized void addDiscoveryNode(TransportAddress... transportAddress) {
+ transportAddresses.addAll(Arrays.asList(transportAddress));
+ transportAddressesStr = format();
+ }
+
+ public synchronized void removeDiscoveryNode(TransportAddress transportAddress) {
+ List<TransportAddress> nodesBuilder = new ArrayList<TransportAddress>();
+
+ for (TransportAddress otherNode : transportAddresses) {
+ if (!otherNode.getAddress().equals(transportAddress.getAddress())) {
+ nodesBuilder.add(otherNode);
+ }
+ }
+
+ transportAddresses = nodesBuilder;
+ transportAddressesStr = format();
+ }
+
+ private String format() {
+ return String.join(","
+ , transportAddresses.stream()
+ .map(x -> String.format("%s:%s", x.getAddress(), x.getPort()))
+ .collect(Collectors.toList())
+ );
+ }
+
+ public String transportAddress() {
+ return transportAddressesStr;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java
new file mode 100644
index 0000000..fde111d
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/TransportClientEnhanceInfo.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6;
+
+public class TransportClientEnhanceInfo {
+ /**
+ * elasticsearch cluster name
+ */
+ private String clusterName;
+
+ private TransportAddressCache transportAddressCache;
+
+ public String transportAddresses() {
+ return transportAddressCache.transportAddress();
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public TransportAddressCache getTransportAddressCache() {
+ return transportAddressCache;
+ }
+
+ public void setTransportAddressCache(TransportAddressCache transportAddressCache) {
+ this.transportAddressCache = transportAddressCache;
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java
similarity index 81%
copy from apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java
copy to apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java
index a653f81..af6c92b 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/PlainListenableActionFutureInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/AdapterActionFutureInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.elasticsearch.v5.define;
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -25,11 +25,12 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class PlainListenableActionFutureInstrumentation extends ClassEnhancePluginDefine {
+public class AdapterActionFutureInstrumentation extends ClassEnhancePluginDefine {
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -38,7 +39,7 @@ public class PlainListenableActionFutureInstrumentation extends ClassEnhancePlug
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
- return new InstanceMethodsInterceptPoint[] {
+ return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
@@ -47,7 +48,7 @@ public class PlainListenableActionFutureInstrumentation extends ClassEnhancePlug
@Override
public String getMethodsInterceptor() {
- return "org.apache.skywalking.apm.plugin.elasticsearch.v5.PlainListenableActionFutureInterceptor";
+ return "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.AdapterActionFutureActionGetMethodsInterceptor";
}
@Override
@@ -65,6 +66,11 @@ public class PlainListenableActionFutureInstrumentation extends ClassEnhancePlug
@Override
protected ClassMatch enhanceClass() {
- return byName("org.elasticsearch.action.support.PlainListenableActionFuture");
+ return byName("org.elasticsearch.action.support.AdapterActionFuture");
+ }
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java
similarity index 81%
copy from apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java
copy to apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java
index d98b457..0caf481 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportActionNodeProxyInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportActionNodeProxyInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.elasticsearch.v5.define;
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -25,6 +25,7 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static net.bytebuddy.matcher.ElementMatchers.named;
@@ -32,20 +33,18 @@ import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName
public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginDefine {
- public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v5.TransportActionNodeProxyInterceptor";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportActionNodeProxyExecuteMethodsInterceptor";
public static final String ENHANC_CLASS = "org.elasticsearch.action.TransportActionNodeProxy";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
- return new ConstructorInterceptPoint[] {
+ return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
- @Override
- public ElementMatcher<MethodDescription> getConstructorMatcher() {
+ @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
- @Override
- public String getConstructorInterceptor() {
+ @Override public String getConstructorInterceptor() {
return INTERCEPTOR_CLASS;
}
}
@@ -54,7 +53,7 @@ public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginD
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
- return new InstanceMethodsInterceptPoint[] {
+ return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
@@ -83,4 +82,9 @@ public class TransportActionNodeProxyInstrumentation extends ClassEnhancePluginD
protected ClassMatch enhanceClass() {
return byName(ENHANC_CLASS);
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
+ }
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java
similarity index 50%
copy from apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java
copy to apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java
index df10273..35040ea 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportClientNodesServiceInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportClientNodesServiceInstrumentation.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.skywalking.apm.plugin.elasticsearch.v5.define;
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -23,61 +23,87 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.ConstructorInterc
import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.skywalking.apm.agent.core.plugin.bytebuddy.ArgumentTypeNameMatch.takesArgumentWithType;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
public class TransportClientNodesServiceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
- public static final String ADD_TRANSPORT_ADDRESSES_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v5.AddTransportAddressesInterceptor";
- public static final String REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v5.RemoveTransportAddressInterceptor";
+ public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportClientNodesServiceInterceptor";
+ public static final String ADD_TRANSPORT_ADDRESSES_INTERCEPTOR = INTERCEPTOR_CLASS + "$AddTransportAddressesInterceptor";
+ public static final String REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR = INTERCEPTOR_CLASS + "$RemoveTransportAddressInterceptor";
+ public static final String EXECUTE_INTERCEPTOR = INTERCEPTOR_CLASS + "$ExecuteInterceptor";
+
public static final String ENHANCE_CLASS = "org.elasticsearch.client.transport.TransportClientNodesService";
- @Override
- public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
- return new ConstructorInterceptPoint[0];
+ @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
+ return new ConstructorInterceptPoint[] {
+ new ConstructorInterceptPoint() {
+ @Override
+ public ElementMatcher<MethodDescription> getConstructorMatcher() {
+ return takesArgumentWithType(1, "org.elasticsearch.transport.TransportService");
+ }
+
+ @Override
+ public String getConstructorInterceptor() {
+ return INTERCEPTOR_CLASS;
+ }
+ }
+ };
}
- @Override
- public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
- return new InstanceMethodsInterceptPoint[] {
+ @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
+ return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
- @Override
- public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("addTransportAddresses");
}
- @Override
- public String getMethodsInterceptor() {
+ @Override public String getMethodsInterceptor() {
return ADD_TRANSPORT_ADDRESSES_INTERCEPTOR;
}
- @Override
- public boolean isOverrideArgs() {
+ @Override public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
- @Override
- public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("removeTransportAddress");
}
- @Override
- public String getMethodsInterceptor() {
+ @Override public String getMethodsInterceptor() {
return REMOVE_TRANSPORT_ADDRESS_INTERCEPTOR;
}
- @Override
- public boolean isOverrideArgs() {
+ @Override public boolean isOverrideArgs() {
+ return false;
+ }
+ },
+ new InstanceMethodsInterceptPoint() {
+ @Override public ElementMatcher<MethodDescription> getMethodsMatcher() {
+ return named("execute");
+ }
+
+ @Override public String getMethodsInterceptor() {
+ return EXECUTE_INTERCEPTOR;
+ }
+
+ @Override public boolean isOverrideArgs() {
return false;
}
}
};
}
- @Override
- protected ClassMatch enhanceClass() {
+ @Override protected ClassMatch enhanceClass() {
return byName(ENHANCE_CLASS);
}
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
+ }
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java
similarity index 82%
copy from apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java
copy to apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java
index 55e6e91..bd7c0ec 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-5.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v5/define/TransportProxyClientInstrumentation.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/define/TransportServiceInstrumentation.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.skywalking.apm.plugin.elasticsearch.v5.define;
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.define;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
@@ -25,13 +25,14 @@ import org.apache.skywalking.apm.agent.core.plugin.interceptor.InstanceMethodsIn
import org.apache.skywalking.apm.agent.core.plugin.interceptor.StaticMethodsInterceptPoint;
import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import org.apache.skywalking.apm.agent.core.plugin.match.ClassMatch;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.Constants;
import static net.bytebuddy.matcher.ElementMatchers.any;
import static org.apache.skywalking.apm.agent.core.plugin.match.NameMatch.byName;
-public class TransportProxyClientInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
+public class TransportServiceInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
- private static final String ENHANCE_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v5.TransportProxyClientInterceptor";
+ private static final String ENHANCE_CLASS = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.TransportServiceConInterceptor";
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
@@ -62,6 +63,11 @@ public class TransportProxyClientInstrumentation extends ClassInstanceMethodsEnh
@Override
protected ClassMatch enhanceClass() {
- return byName("org.elasticsearch.client.transport.TransportProxyClient");
+ return byName("org.elasticsearch.transport.TransportService");
+ }
+
+ @Override
+ protected String[] witnessClasses() {
+ return new String[]{Constants.TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES};
}
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java
new file mode 100644
index 0000000..7a15809
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptor.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateResponse;
+
+import java.lang.reflect.Method;
+
+import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.ELASTICSEARCH_DSL_LENGTH_THRESHOLD;
+import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
+
+public class AdapterActionFutureActionGetMethodsInterceptor implements InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+
+ if (!isTrace(objInst)) {
+ return;
+ }
+
+ AbstractSpan span = ContextManager.createLocalSpan(Constants.DB_TYPE + "/" + Constants.BASE_FUTURE_METHOD);
+ span.setComponent(ComponentsDefine.TRANSPORT_CLIENT);
+ Tags.DB_TYPE.set(span, Constants.DB_TYPE);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Object ret) throws Throwable {
+
+ if (!isTrace(objInst)) {
+ return ret;
+ }
+
+ AbstractSpan span = ContextManager.activeSpan();
+ parseResponseInfo((ActionResponse) ret, span);
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+
+ private boolean isTrace(EnhancedInstance objInst) {
+
+ return objInst.getSkyWalkingDynamicField() != null && (boolean) objInst.getSkyWalkingDynamicField();
+
+ }
+
+ private void parseResponseInfo(ActionResponse response, AbstractSpan span) {
+ // search response
+ if (response instanceof SearchResponse) {
+ parseSearchResponse((SearchResponse) response, span);
+ return;
+ }
+ // bulk response
+ if (response instanceof BulkResponse) {
+ parseBulkResponse((BulkResponse) response, span);
+ return;
+ }
+ // get response
+ if (response instanceof GetResponse) {
+ parseGetResponse((GetResponse) response, span);
+ return;
+ }
+ // index response
+ if (response instanceof IndexResponse) {
+ parseIndexResponse((IndexResponse) response, span);
+ return;
+ }
+ // update response
+ if (response instanceof UpdateResponse) {
+ parseUpdateResponse((UpdateResponse) response, span);
+ return;
+ }
+ // delete response
+ if (response instanceof DeleteResponse) {
+ parseDeleteResponse((DeleteResponse) response, span);
+ return;
+ }
+ }
+
+ private void parseSearchResponse(SearchResponse searchResponse, AbstractSpan span) {
+ span.tag(Constants.ES_TOOK_MILLIS, Long.toString(searchResponse.getTook().getMillis()));
+ span.tag(Constants.ES_TOTAL_HITS, Long.toString(searchResponse.getHits().getTotalHits()));
+ if (TRACE_DSL) {
+ String tagValue = searchResponse.toString();
+ tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
+ Tags.DB_STATEMENT.set(span, tagValue);
+ }
+ }
+
+ private void parseBulkResponse(BulkResponse bulkResponse, AbstractSpan span) {
+ span.tag(Constants.ES_TOOK_MILLIS, Long.toString(bulkResponse.getTook().getMillis()));
+ span.tag(Constants.ES_INGEST_TOOK_MILLIS, Long.toString(bulkResponse.getIngestTookInMillis()));
+ if (TRACE_DSL) {
+ String tagValue = bulkResponse.toString();
+ tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
+ Tags.DB_STATEMENT.set(span, tagValue);
+ }
+ }
+
+ private void parseGetResponse(GetResponse getResponse, AbstractSpan span) {
+ if (TRACE_DSL) {
+ String tagValue = getResponse.toString();
+ tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
+ Tags.DB_STATEMENT.set(span, tagValue);
+ }
+ }
+
+ private void parseIndexResponse(IndexResponse indexResponse, AbstractSpan span) {
+ if (TRACE_DSL) {
+ String tagValue = indexResponse.toString();
+ tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
+ Tags.DB_STATEMENT.set(span, tagValue);
+ }
+ }
+
+ private void parseUpdateResponse(UpdateResponse updateResponse, AbstractSpan span) {
+ if (TRACE_DSL) {
+ String tagValue = updateResponse.toString();
+ tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
+ Tags.DB_STATEMENT.set(span, tagValue);
+ }
+ }
+
+ private void parseDeleteResponse(DeleteResponse deleteResponse, AbstractSpan span) {
+ if (TRACE_DSL) {
+ String tagValue = deleteResponse.toString();
+ tagValue = ELASTICSEARCH_DSL_LENGTH_THRESHOLD > 0 ? StringUtil.cut(tagValue, ELASTICSEARCH_DSL_LENGTH_THRESHOLD) : tagValue;
+ Tags.DB_STATEMENT.set(span, tagValue);
+ }
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java
index 0c3c3f6..d8dee69 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/Constants.java
@@ -18,6 +18,9 @@
package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+
public class Constants {
//interceptor class
public static final String REST_HIGH_LEVEL_CLIENT_CON_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.RestHighLevelClientConInterceptor";
@@ -33,6 +36,9 @@ public class Constants {
public static final String CLUSTER_CLIENT_GET_SETTINGS_METHODS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.ClusterClientGetSettingsMethodsInterceptor";
public static final String CLUSTER_CLIENT_PUT_SETTINGS_METHODS_INTERCEPTOR = "org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor.ClusterClientPutSettingsMethodsInterceptor";
+ //witnessClasses
+ public static final String TASK_TRANSPORT_CHANNEL_WITNESS_CLASSES = "org.elasticsearch.transport.TaskTransportChannel";
+
//es operator name
public static final String CREATE_OPERATOR_NAME = "Elasticsearch/CreateRequest";
public static final String DELETE_OPERATOR_NAME = "Elasticsearch/DeleteRequest";
@@ -45,4 +51,14 @@ public class Constants {
public static final String CLUSTER_PUT_SETTINGS_NAME = "Elasticsearch/PutSettings";
public static final String DB_TYPE = "Elasticsearch";
+
+ public static final String BASE_FUTURE_METHOD = "actionGet";
+
+ //tags
+ public static final AbstractTag<String> ES_NODE = Tags.ofKey("node.address");
+ public static final AbstractTag<String> ES_INDEX = Tags.ofKey("es.indices");
+ public static final AbstractTag<String> ES_TYPE = Tags.ofKey("es.types");
+ public static final AbstractTag<String> ES_TOOK_MILLIS = Tags.ofKey("es.took_millis");
+ public static final AbstractTag<String> ES_TOTAL_HITS = Tags.ofKey("es.total_hits");
+ public static final AbstractTag<String> ES_INGEST_TOOK_MILLIS = Tags.ofKey("es.ingest_took_millis");
}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java
new file mode 100644
index 0000000..af9d1e6
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.context.ContextManager;
+import org.apache.skywalking.apm.agent.core.context.tag.Tags;
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.network.trace.component.ComponentsDefine;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+
+import java.lang.reflect.Method;
+
+import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
+
+public class TransportActionNodeProxyExecuteMethodsInterceptor implements InstanceConstructorInterceptor, InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+
+ TransportClientEnhanceInfo enhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField();
+ ActionRequest request = (ActionRequest) allArguments[1];
+ String opType = request.getClass().getSimpleName();
+ String operationName = Constants.DB_TYPE + "/" + opType;
+ AbstractSpan span = ContextManager.createExitSpan(operationName, enhanceInfo.transportAddresses());
+ span.setComponent(ComponentsDefine.TRANSPORT_CLIENT);
+ Tags.DB_TYPE.set(span, Constants.DB_TYPE);
+ Tags.DB_INSTANCE.set(span, enhanceInfo.getClusterName());
+ span.tag(Constants.ES_NODE, ((DiscoveryNode) allArguments[0]).getAddress().toString());
+ parseRequestInfo(request, span);
+
+ SpanLayer.asDB(span);
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ ContextManager.stopSpan();
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+ ContextManager.activeSpan().errorOccurred().log(t);
+ }
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ EnhancedInstance actions = (EnhancedInstance) allArguments[2];
+ objInst.setSkyWalkingDynamicField(actions.getSkyWalkingDynamicField());
+ }
+
+ private void parseRequestInfo(ActionRequest request, AbstractSpan span) {
+ // search request
+ if (request instanceof SearchRequest) {
+ parseSearchRequest((SearchRequest) request, span);
+ return;
+ }
+ // get request
+ if (request instanceof GetRequest) {
+ parseGetRequest((GetRequest) request, span);
+ return;
+ }
+ // index request
+ if (request instanceof IndexRequest) {
+ parseIndexRequest((IndexRequest) request, span);
+ return;
+ }
+ // update request
+ if (request instanceof UpdateRequest) {
+ parseUpdateRequest((UpdateRequest) request, span);
+ return;
+ }
+ // delete request
+ if (request instanceof DeleteRequest) {
+ parseDeleteRequest((DeleteRequest) request, span);
+ return;
+ }
+ // delete index request
+ if (request instanceof DeleteIndexRequest) {
+ parseDeleteIndexRequest((DeleteIndexRequest) request, span);
+ return;
+ }
+ }
+
+ private void parseSearchRequest(SearchRequest searchRequest, AbstractSpan span) {
+ span.tag(Constants.ES_INDEX, StringUtil.join(',', searchRequest.indices()));
+ span.tag(Constants.ES_TYPE, StringUtil.join(',', searchRequest.types()));
+ if (TRACE_DSL) {
+ Tags.DB_STATEMENT.set(span, searchRequest.toString());
+ }
+ }
+
+ private void parseGetRequest(GetRequest getRequest, AbstractSpan span) {
+ span.tag(Constants.ES_INDEX, getRequest.index());
+ span.tag(Constants.ES_TYPE, getRequest.type());
+ if (TRACE_DSL) {
+ Tags.DB_STATEMENT.set(span, getRequest.toString());
+ }
+ }
+
+ private void parseIndexRequest(IndexRequest indexRequest, AbstractSpan span) {
+ span.tag(Constants.ES_INDEX, indexRequest.index());
+ span.tag(Constants.ES_TYPE, indexRequest.type());
+ if (TRACE_DSL) {
+ Tags.DB_STATEMENT.set(span, indexRequest.toString());
+ }
+ }
+
+ private void parseUpdateRequest(UpdateRequest updateRequest, AbstractSpan span) {
+ span.tag(Constants.ES_INDEX, updateRequest.index());
+ span.tag(Constants.ES_TYPE, updateRequest.type());
+ if (TRACE_DSL) {
+ Tags.DB_STATEMENT.set(span, updateRequest.toString());
+ }
+ }
+
+ private void parseDeleteRequest(DeleteRequest deleteRequest, AbstractSpan span) {
+ span.tag(Constants.ES_INDEX, deleteRequest.index());
+ span.tag(Constants.ES_TYPE, deleteRequest.type());
+ if (TRACE_DSL) {
+ Tags.DB_STATEMENT.set(span, deleteRequest.toString());
+ }
+ }
+
+ private void parseDeleteIndexRequest(DeleteIndexRequest deleteIndexRequest, AbstractSpan span) {
+ span.tag(Constants.ES_INDEX, String.join(",", deleteIndexRequest.indices()));
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java
new file mode 100644
index 0000000..5c71fc5
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportClientNodesServiceInterceptor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache;
+import org.elasticsearch.action.support.AdapterActionFuture;
+import org.elasticsearch.common.transport.TransportAddress;
+
+import java.lang.reflect.Method;
+
+public class TransportClientNodesServiceInterceptor implements InstanceConstructorInterceptor {
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+
+ EnhancedInstance actions = (EnhancedInstance) allArguments[1];
+ objInst.setSkyWalkingDynamicField(actions.getSkyWalkingDynamicField());
+ }
+
+ public static class AddTransportAddressesInterceptor implements InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ TransportClientEnhanceInfo transportClientEnhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField();
+ TransportAddressCache transportAddressCache = transportClientEnhanceInfo.getTransportAddressCache();
+ if (transportAddressCache == null) {
+ transportAddressCache = new TransportAddressCache();
+ transportClientEnhanceInfo.setTransportAddressCache(transportAddressCache);
+ }
+ transportAddressCache.addDiscoveryNode((TransportAddress[]) allArguments[0]);
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+ }
+
+ public static class RemoveTransportAddressInterceptor implements InstanceMethodsAroundInterceptor {
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+ MethodInterceptResult result) throws Throwable {
+
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
+ Object ret) throws Throwable {
+ TransportClientEnhanceInfo transportClientEnhanceInfo = (TransportClientEnhanceInfo) objInst.getSkyWalkingDynamicField();
+ TransportAddressCache transportAddressCache = transportClientEnhanceInfo.getTransportAddressCache();
+ if (transportAddressCache == null) {
+ transportAddressCache = new TransportAddressCache();
+ transportClientEnhanceInfo.setTransportAddressCache(transportAddressCache);
+ }
+ transportAddressCache.removeDiscoveryNode((TransportAddress) allArguments[0]);
+ return ret;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
+ Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+ }
+
+ public static class ExecuteInterceptor implements InstanceMethodsAroundInterceptor {
+
+ @Override
+ public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
+
+ // tracking AdapterActionFuture.actionGet
+ if (allArguments.length >= 2 && allArguments[1] instanceof AdapterActionFuture) {
+ AdapterActionFuture actionFuture = (AdapterActionFuture) allArguments[1];
+ ((EnhancedInstance) actionFuture).setSkyWalkingDynamicField(true);
+ }
+ }
+
+ @Override
+ public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
+ return null;
+ }
+
+ @Override
+ public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
+
+ }
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java
new file mode 100644
index 0000000..e4d0da5
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache;
+import org.elasticsearch.common.settings.Settings;
+
+public class TransportServiceConInterceptor implements InstanceConstructorInterceptor {
+
+ @Override
+ public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
+ Settings settings = (Settings) allArguments[0];
+ String clusterName = settings.get("cluster.name");
+
+ TransportClientEnhanceInfo transportClientEnhanceInfo = new TransportClientEnhanceInfo();
+ transportClientEnhanceInfo.setClusterName(clusterName);
+ transportClientEnhanceInfo.setTransportAddressCache(new TransportAddressCache());
+
+ objInst.setSkyWalkingDynamicField(transportClientEnhanceInfo);
+ }
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def
index 9752c22..0a40517 100644
--- a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/main/resources/skywalking-plugin.def
@@ -17,3 +17,7 @@
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.RestHighLevelClientInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.IndicesClientInstrumentation
elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.ClusterClientInstrumentation
+elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.AdapterActionFutureInstrumentation
+elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportServiceInstrumentation
+elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportActionNodeProxyInstrumentation
+elasticsearch-6.x=org.apache.skywalking.apm.plugin.elasticsearch.v6.define.TransportClientNodesServiceInstrumentation
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java
new file mode 100644
index 0000000..3754c41
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/AdapterActionFutureActionGetMethodsInterceptorTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.LocalSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.SearchHits;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.util.List;
+
+import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.TRANSPORT_CLIENT;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class AdapterActionFutureActionGetMethodsInterceptorTest {
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ @Mock
+ private EnhancedInstance enhancedInstance;
+
+ @Mock
+ private SearchResponse searchResponse;
+
+ @Mock
+ private BulkResponse bulkItemResponses;
+
+ private SearchHits searchHits;
+
+ @Mock
+ private AdapterActionFutureActionGetMethodsInterceptor interceptor;
+
+ @Before
+ public void setUp() {
+
+ searchHits = new SearchHits(null, 309L, 0);
+
+ when(searchResponse.getTook()).thenReturn(TimeValue.timeValueMillis(2020));
+ when(searchResponse.getHits()).thenReturn(searchHits);
+
+ when(bulkItemResponses.getTook()).thenReturn(TimeValue.timeValueMillis(2020));
+ when(bulkItemResponses.getIngestTookInMillis()).thenReturn(1416L);
+
+ when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(true);
+
+ interceptor = new AdapterActionFutureActionGetMethodsInterceptor();
+ }
+
+ @Test
+ public void testMethodsAround() throws Throwable {
+ TRACE_DSL = true;
+ interceptor.beforeMethod(enhancedInstance, null, null, null, null);
+ interceptor.afterMethod(enhancedInstance, null, null, null, searchResponse);
+
+ List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+ Assert.assertThat(traceSegmentList.size(), is(1));
+ TraceSegment traceSegment = traceSegmentList.get(0);
+
+ AbstractTracingSpan getSpan = SegmentHelper.getSpans(traceSegment).get(0);
+ assertGetSpan(getSpan, searchResponse);
+ }
+
+ @Test
+ public void testMethodsAround2() throws Throwable {
+ TRACE_DSL = true;
+ interceptor.beforeMethod(enhancedInstance, null, null, null, null);
+ interceptor.afterMethod(enhancedInstance, null, null, null, bulkItemResponses);
+
+ List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+ Assert.assertThat(traceSegmentList.size(), is(1));
+ TraceSegment traceSegment = traceSegmentList.get(0);
+
+ AbstractTracingSpan getSpan = SegmentHelper.getSpans(traceSegment).get(0);
+ assertGetSpan(getSpan, bulkItemResponses);
+ }
+
+ private void assertGetSpan(AbstractTracingSpan getSpan, Object ret) {
+ assertThat(getSpan instanceof LocalSpan, is(true));
+
+ LocalSpan span = (LocalSpan) getSpan;
+ assertThat(span.getOperationName(), is("Elasticsearch/actionGet"));
+ assertThat(SpanHelper.getComponentId(span), is(TRANSPORT_CLIENT.getId()));
+
+ List<TagValuePair> tags = SpanHelper.getTags(span);
+ assertThat(tags.size(), is(4));
+ if (ret instanceof SearchResponse) {
+ assertThat(tags.get(0).getValue(), is("Elasticsearch"));
+ assertThat(tags.get(1).getValue(), is("2020"));
+ assertThat(tags.get(2).getValue(), is("309"));
+ } else if (ret instanceof BulkResponse) {
+ assertThat(tags.get(0).getValue(), is("Elasticsearch"));
+ assertThat(tags.get(1).getValue(), is("2020"));
+ assertThat(tags.get(2).getValue(), is("1416"));
+ }
+
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java
new file mode 100644
index 0000000..68c1f28
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportActionNodeProxyExecuteMethodsInterceptorTest.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.context.trace.AbstractTracingSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.ExitSpan;
+import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
+import org.apache.skywalking.apm.agent.core.context.util.TagValuePair;
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.helper.SegmentHelper;
+import org.apache.skywalking.apm.agent.test.helper.SpanHelper;
+import org.apache.skywalking.apm.agent.test.tools.AgentServiceRule;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStorage;
+import org.apache.skywalking.apm.agent.test.tools.SegmentStoragePoint;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+import static org.apache.skywalking.apm.agent.core.conf.Config.Plugin.Elasticsearch.TRACE_DSL;
+import static org.apache.skywalking.apm.network.trace.component.ComponentsDefine.TRANSPORT_CLIENT;
+import static org.junit.Assert.assertThat;
+import static org.hamcrest.CoreMatchers.is;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+public class TransportActionNodeProxyExecuteMethodsInterceptorTest {
+
+ @SegmentStoragePoint
+ private SegmentStorage segmentStorage;
+
+ @Rule
+ public AgentServiceRule serviceRule = new AgentServiceRule();
+
+ @Mock
+ private EnhancedInstance enhancedInstance;
+
+ @Mock
+ private DiscoveryNode discoveryNode;
+
+ @Mock
+ private GetRequest getRequest;
+
+ @Mock
+ private IndexRequest indexRequest;
+
+ @Mock
+ private UpdateRequest updateRequest;
+
+ @Mock
+ private DeleteRequest deleteRequest;
+
+ @Mock
+ private DeleteIndexRequest deleteIndexRequest;
+
+ @Mock
+ private TransportClientEnhanceInfo enhanceInfo;
+
+ private TransportActionNodeProxyExecuteMethodsInterceptor interceptor;
+
+ @Before
+ public void setUp() {
+
+ InetSocketAddress inetSocketAddress = new InetSocketAddress("122.122.122.122", 9300);
+ TransportAddress transportAddress = new TransportAddress(inetSocketAddress);
+ when(discoveryNode.getAddress()).thenReturn(transportAddress);
+
+ when(enhanceInfo.transportAddresses()).thenReturn("122.122.122.122:9300");
+ when(enhanceInfo.getClusterName()).thenReturn("skywalking-es");
+ when(enhancedInstance.getSkyWalkingDynamicField()).thenReturn(enhanceInfo);
+
+ when(getRequest.index()).thenReturn("endpoint");
+ when(getRequest.type()).thenReturn("getType");
+
+ when(indexRequest.index()).thenReturn("endpoint");
+ when(indexRequest.type()).thenReturn("indexType");
+
+ when(updateRequest.index()).thenReturn("endpoint");
+ when(updateRequest.type()).thenReturn("updateType");
+
+ when(deleteRequest.index()).thenReturn("endpoint");
+ when(deleteRequest.type()).thenReturn("deleteType");
+
+ when(deleteIndexRequest.indices()).thenReturn(new String[]{"endpoint"});
+
+ interceptor = new TransportActionNodeProxyExecuteMethodsInterceptor();
+ }
+
+ @Test
+ public void testConstruct() {
+
+ final EnhancedInstance objInst1 = new EnhancedInstance() {
+ private Object object = null;
+
+ @Override
+ public Object getSkyWalkingDynamicField() {
+ return object;
+ }
+
+ @Override
+ public void setSkyWalkingDynamicField(Object value) {
+ this.object = value;
+ }
+ };
+
+ final EnhancedInstance objInst2 = new EnhancedInstance() {
+ private Object object = null;
+
+ @Override
+ public Object getSkyWalkingDynamicField() {
+ return object;
+ }
+
+ @Override
+ public void setSkyWalkingDynamicField(Object value) {
+ this.object = value;
+ }
+ };
+
+ objInst1.setSkyWalkingDynamicField(123);
+ Object[] allArguments = new Object[]{null, null, objInst1};
+
+ interceptor.onConstruct(objInst2, allArguments);
+ assertThat(objInst1.getSkyWalkingDynamicField(), is(objInst2.getSkyWalkingDynamicField()));
+ }
+
+ @Test
+ public void testGetRequest() throws Throwable {
+
+ AbstractTracingSpan getSpan = getSpan(getRequest);
+ assertGetSpan(getSpan, getRequest);
+ }
+
+ @Test
+ public void testIndexRequest() throws Throwable {
+
+ AbstractTracingSpan getSpan = getSpan(indexRequest);
+ assertGetSpan(getSpan, indexRequest);
+ }
+
+ @Test
+ public void testUpdateRequest() throws Throwable {
+
+ AbstractTracingSpan getSpan = getSpan(updateRequest);
+ assertGetSpan(getSpan, updateRequest);
+ }
+
+ @Test
+ public void testDeleteRequest() throws Throwable {
+
+ AbstractTracingSpan getSpan = getSpan(deleteRequest);
+ assertGetSpan(getSpan, deleteRequest);
+ }
+
+ @Test
+ public void testDeleteIndexRequest() throws Throwable {
+
+ AbstractTracingSpan getSpan = getSpan(deleteIndexRequest);
+ assertGetSpan(getSpan, deleteIndexRequest);
+ }
+
+ private AbstractTracingSpan getSpan(ActionRequest actionRequest) throws Throwable {
+ TRACE_DSL = true;
+ Object[] allArguments = new Object[]{discoveryNode, actionRequest};
+
+ interceptor.beforeMethod(enhancedInstance, null, allArguments, null, null);
+ interceptor.afterMethod(enhancedInstance, null, allArguments, null, null);
+
+ List<TraceSegment> traceSegmentList = segmentStorage.getTraceSegments();
+ Assert.assertThat(traceSegmentList.size(), is(1));
+ TraceSegment traceSegment = traceSegmentList.get(0);
+ return SegmentHelper.getSpans(traceSegment).get(0);
+
+ }
+
+ private void assertGetSpan(AbstractTracingSpan getSpan, Object ret) {
+ assertThat(getSpan instanceof ExitSpan, is(true));
+
+ ExitSpan span = (ExitSpan) getSpan;
+ assertThat(SpanHelper.getComponentId(span), is(TRANSPORT_CLIENT.getId()));
+
+ List<TagValuePair> tags = SpanHelper.getTags(span);
+ assertThat(tags.get(0).getValue(), is("Elasticsearch"));
+ assertThat(tags.get(1).getValue(), is("skywalking-es"));
+ assertThat(tags.get(2).getValue(), is("122.122.122.122:9300"));
+ if (ret instanceof SearchRequest) {
+ assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/SearchRequest"));
+ assertThat(tags.get(3).getValue(), is("endpoint"));
+ assertThat(tags.get(4).getValue(), is("searchType"));
+ } else if (ret instanceof GetRequest) {
+ assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/GetRequest"));
+ assertThat(tags.get(3).getValue(), is("endpoint"));
+ assertThat(tags.get(4).getValue(), is("getType"));
+ } else if (ret instanceof IndexRequest) {
+ assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/IndexRequest"));
+ assertThat(tags.get(3).getValue(), is("endpoint"));
+ assertThat(tags.get(4).getValue(), is("indexType"));
+ } else if (ret instanceof UpdateRequest) {
+ assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/UpdateRequest"));
+ assertThat(tags.get(3).getValue(), is("endpoint"));
+ assertThat(tags.get(4).getValue(), is("updateType"));
+ } else if (ret instanceof DeleteRequest) {
+ assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/DeleteRequest"));
+ assertThat(tags.get(3).getValue(), is("endpoint"));
+ assertThat(tags.get(4).getValue(), is("deleteType"));
+ } else if (ret instanceof DeleteIndexRequest) {
+ assertThat(span.getOperationName().split("[$$]")[0], is("Elasticsearch/DeleteIndexRequest"));
+ assertThat(tags.get(3).getValue(), is("endpoint"));
+ }
+
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java
new file mode 100644
index 0000000..ed61c3f
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportAddressCacheTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportAddressCache;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class TransportAddressCacheTest {
+
+ private TransportAddressCache transportAddressCache;
+
+ @Before
+ public void setUp() {
+ transportAddressCache = new TransportAddressCache();
+ }
+
+ @Test
+ public void transportAddressTest()
+ throws UnknownHostException {
+
+ transportAddressCache.addDiscoveryNode(
+ new TransportAddress(InetAddress.getByName("172.1.1.1"), 9300),
+ new TransportAddress(InetAddress.getByName("172.1.1.2"), 9200),
+ new TransportAddress(InetAddress.getByName("172.1.1.3"), 9100)
+ );
+
+ assertThat(transportAddressCache.transportAddress(), is("172.1.1.1:9300,172.1.1.2:9200,172.1.1.3:9100"));
+ }
+
+}
diff --git a/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java
new file mode 100644
index 0000000..9634682
--- /dev/null
+++ b/apm-sniffer/apm-sdk-plugin/elasticsearch-6.x-plugin/src/test/java/org/apache/skywalking/apm/plugin/elasticsearch/v6/interceptor/TransportServiceConInterceptorTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.plugin.elasticsearch.v6.interceptor;
+
+import org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.EnhancedInstance;
+import org.apache.skywalking.apm.agent.test.tools.TracingSegmentRunner;
+import org.apache.skywalking.apm.plugin.elasticsearch.v6.TransportClientEnhanceInfo;
+import org.elasticsearch.common.settings.Settings;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.modules.junit4.PowerMockRunnerDelegate;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockRunnerDelegate(TracingSegmentRunner.class)
+@PrepareForTest(value = {
+ Settings.class
+})
+public class TransportServiceConInterceptorTest {
+
+ @Mock
+ private Settings settings;
+
+ private Object[] allArguments;
+
+ private TransportServiceConInterceptor transportServiceConInterceptor;
+
+ @Before
+ public void setUp() {
+ when(settings.get("cluster.name")).thenReturn("my.es.cluster");
+
+ allArguments = new Object[]{settings};
+ }
+
+ @Test
+ public void testConstruct() {
+
+ final EnhancedInstance objInst = new EnhancedInstance() {
+ private Object object = null;
+
+ @Override
+ public Object getSkyWalkingDynamicField() {
+ return object;
+ }
+
+ @Override
+ public void setSkyWalkingDynamicField(Object value) {
+ this.object = value;
+ }
+ };
+
+ transportServiceConInterceptor = new TransportServiceConInterceptor();
+ transportServiceConInterceptor.onConstruct(objInst, allArguments);
+
+ assertThat(objInst.getSkyWalkingDynamicField() instanceof TransportClientEnhanceInfo, is(true));
+ assertThat(((TransportClientEnhanceInfo) (objInst.getSkyWalkingDynamicField())).getClusterName(), is("my.es.cluster"));
+ }
+
+}
diff --git a/docs/en/setup/service-agent/java-agent/Supported-list.md b/docs/en/setup/service-agent/java-agent/Supported-list.md
index 1468fe3..735f185 100644
--- a/docs/en/setup/service-agent/java-agent/Supported-list.md
+++ b/docs/en/setup/service-agent/java-agent/Supported-list.md
@@ -59,7 +59,8 @@
* [Spymemcached](https://github.com/couchbase/spymemcached) 2.x
* [Xmemcached](https://github.com/killme2008/xmemcached) 2.x
* [Elasticsearch](https://github.com/elastic/elasticsearch)
- * [transport-client](https://github.com/elastic/elasticsearch/tree/master/client/transport) 5.2.x-5.6.x
+ * [transport-client](https://github.com/elastic/elasticsearch/tree/v5.2.0/client/transport) 5.2.x-5.6.x
+ * [transport-client](https://github.com/elastic/elasticsearch/tree/v6.7.1/client/transport) 6.7.1-6.8.4
* [rest-high-level-client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/index.html) 6.7.1-6.8.4
* [rest-high-level-client](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/7.0/java-rest-high.html) 7.0.0-7.5.2
* [Solr](https://github.com/apache/lucene-solr/)
diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml b/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml
index 9a2c16b..745377f 100644
--- a/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml
+++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/config/expectedData.yaml
@@ -128,6 +128,140 @@ segmentItems:
tags:
- {key: db.type, value: Elasticsearch}
- {key: db.instance, value: not null}
+ - operationName: Elasticsearch/IndexRequest
+ operationId: 0
+ parentSpanId: 0
+ spanId: 7
+ spanLayer: Database
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Exit
+ peer: not null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.instance, value: not null}
+ - {key: node.address, value: not null}
+ - {key: es.indices, value: not null}
+ - {key: es.types, value: not null}
+ - {key: db.statement, value: not null}
+ - operationName: Elasticsearch/actionGet
+ operationId: 0
+ parentSpanId: 0
+ spanId: 8
+ spanLayer: null
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Local
+ peer: null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.statement, value: not null}
+ - operationName: Elasticsearch/GetRequest
+ operationId: 0
+ parentSpanId: 0
+ spanId: 9
+ spanLayer: Database
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Exit
+ peer: not null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.instance, value: not null}
+ - {key: node.address, value: not null}
+ - {key: es.indices, value: not null}
+ - {key: es.types, value: not null}
+ - {key: db.statement, value: not null}
+ - operationName: Elasticsearch/SearchRequest
+ operationId: 0
+ parentSpanId: 0
+ spanId: 10
+ spanLayer: Database
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Exit
+ peer: not null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.instance, value: not null}
+ - {key: node.address, value: not null}
+ - {key: es.indices, value: not null}
+ - {key: es.types, value: not null}
+ - {key: db.statement, value: not null}
+ - operationName: Elasticsearch/UpdateRequest
+ operationId: 0
+ parentSpanId: 0
+ spanId: 11
+ spanLayer: Database
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Exit
+ peer: not null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.instance, value: not null}
+ - {key: node.address, value: not null}
+ - {key: es.indices, value: not null}
+ - {key: es.types, value: not null}
+ - {key: db.statement, value: not null}
+ - operationName: Elasticsearch/DeleteRequest
+ operationId: 0
+ parentSpanId: 0
+ spanId: 12
+ spanLayer: Database
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Exit
+ peer: not null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.instance, value: not null}
+ - {key: node.address, value: not null}
+ - {key: es.indices, value: not null}
+ - {key: es.types, value: not null}
+ - {key: db.statement, value: not null}
+ - operationName: Elasticsearch/DeleteIndexRequest
+ operationId: 0
+ parentSpanId: 0
+ spanId: 13
+ spanLayer: Database
+ startTime: nq 0
+ endTime: nq 0
+ componentId: 48
+ componentName: ''
+ isError: false
+ spanType: Exit
+ peer: not null
+ peerId: 0
+ tags:
+ - {key: db.type, value: Elasticsearch}
+ - {key: db.instance, value: not null}
+ - {key: node.address, value: not null}
+ - {key: es.indices, value: not null}
- operationName: /elasticsearch-case/case/elasticsearch
operationId: 0
parentSpanId: -1
@@ -143,4 +277,4 @@ segmentItems:
peerId: 0
tags:
- {key: url, value: 'http://localhost:8080/elasticsearch-case/case/elasticsearch'}
- - {key: http.method, value: GET}
+ - {key: http.method, value: GET}
\ No newline at end of file
diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml b/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml
index f1fb337..eb08d29 100644
--- a/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml
+++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/pom.xml
@@ -31,7 +31,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compiler.version>1.8</compiler.version>
- <test.framework.version>6.7.1</test.framework.version>
+ <test.framework.version>6.8.6</test.framework.version>
<docker.image.version>${test.framework.version}</docker.image.version>
<spring-boot.version>2.1.4.RELEASE</spring-boot.version>
@@ -56,6 +56,12 @@
<artifactId>spring-boot-starter-web</artifactId>
<version>${spring-boot.version}</version>
</dependency>
+ <!-- elasticsearch transport-client -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${test.framework.version}</version>
+ </dependency>
<!-- elasticsearch rest-high-level-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java
similarity index 92%
copy from test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java
copy to test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java
index d015a07..008261b 100644
--- a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java
+++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/RestHighLevelClientCase.java
@@ -16,11 +16,8 @@
*
*/
-package org.apache.skywalking.apm.testcase.elasticsearch.controller;
+package org.apache.skywalking.apm.testcase.elasticsearch;
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -50,23 +47,23 @@ import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
import static java.util.Collections.singletonMap;
-@RestController
-@RequestMapping("/elasticsearch-case/case")
-public class CaseController {
+@Component
+public class RestHighLevelClientCase {
- private static Logger logger = LogManager.getLogger(CaseController.class);
+ private static Logger logger = LogManager.getLogger(RestHighLevelClientCase.class);
@Autowired
private RestHighLevelClient client;
- @GetMapping("/healthcheck")
- public String healthcheck() throws Exception {
+ public boolean healthcheck() throws Exception {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout(TimeValue.timeValueSeconds(10));
request.waitForStatus(ClusterHealthStatus.GREEN);
@@ -77,11 +74,10 @@ public class CaseController {
logger.error(message);
throw new RuntimeException(message);
}
- return "Success";
+ return true;
}
- @GetMapping("/elasticsearch")
- public String elasticsearch() throws Exception {
+ public boolean elasticsearch() throws Exception {
String indexName = UUID.randomUUID().toString();
try {
//create
@@ -104,7 +100,7 @@ public class CaseController {
client.close();
}
}
- return "Success";
+ return true;
}
private void createIndex(RestHighLevelClient client, String indexName) throws IOException {
@@ -212,4 +208,3 @@ public class CaseController {
}
}
}
-
diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java
new file mode 100644
index 0000000..9e2e6b5
--- /dev/null
+++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/TransportClientCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.testcase.elasticsearch;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.UUID;
+
+@Component
+public class TransportClientCase {
+
+ private static Logger logger = LogManager.getLogger(TransportClientCase.class);
+
+ @Autowired
+ private TransportClient client;
+
+ public boolean elasticsearch() throws Exception {
+ String indexName = UUID.randomUUID().toString();
+ try {
+ // create
+ index(client, indexName);
+ // get
+ get(client, indexName);
+ // search
+ search(client, indexName);
+ // update
+ update(client, indexName);
+ // delete
+ delete(client, indexName);
+ // remove index
+ client.admin().indices().prepareDelete(indexName).execute();
+ } finally {
+ if (null != client) {
+ client.close();
+ }
+ }
+ return true;
+ }
+
+ private void index(Client client, String indexName) throws IOException {
+ try {
+ client.prepareIndex(indexName, "test", "1")
+ .setSource(XContentFactory.jsonBuilder()
+ .startObject()
+ .field("name", "mysql innodb")
+ .field("price", "0")
+ .field("language", "chinese")
+ .endObject())
+ .get();
+ } catch (IOException e) {
+ logger.error("index document error.", e);
+ throw e;
+ }
+ }
+
+ private void get(Client client, String indexName) {
+ client.prepareGet().setIndex(indexName).setId("1").execute();
+ }
+
+ private void update(Client client, String indexName) throws IOException {
+ try {
+ client.prepareUpdate(indexName, "test", "1")
+ .setDoc(XContentFactory.jsonBuilder().startObject().field("price", "9.9").endObject())
+ .execute();
+ } catch (IOException e) {
+ logger.error("update document error.", e);
+ throw e;
+ }
+ }
+
+ private void delete(Client client, String indexName) {
+ client.prepareDelete(indexName, "test", "1").execute();
+ }
+
+ private void search(Client client, String indexName) {
+ client.prepareSearch(indexName).setTypes("test").setSize(10).execute();
+ }
+}
diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java
new file mode 100644
index 0000000..727a3ed
--- /dev/null
+++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/config/TransportClientConfig.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.testcase.elasticsearch.config;
+
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.net.InetAddress;
+
+@Configuration
+public class TransportClientConfig {
+
+ @Value("${elasticsearch.server}")
+ private String elasticsearchHost;
+
+ public final static Integer PORT = 9300; //端口
+
+ @Bean
+ public TransportClient getESClientConnection()
+ throws Exception {
+
+ TransportClient client = null;
+ Settings settings = Settings.builder()
+ .put("cluster.name", "docker-node")
+ .put("client.transport.sniff", false)
+ .build();
+
+ client = new PreBuiltTransportClient(settings);
+ for (TransportAddress i : parseEsHost()) {
+ client.addTransportAddress(i);
+ }
+ return client;
+ }
+
+ private TransportAddress[] parseEsHost()
+ throws Exception {
+ TransportAddress[] transportAddresses = null;
+ if (!elasticsearchHost.isEmpty()) {
+ String[] hostIp = elasticsearchHost.split(",");
+ transportAddresses = new TransportAddress[hostIp.length];
+
+ for (int i = 0; i < hostIp.length; ++i) {
+ String[] hostIpItem = hostIp[i].split(":");
+ String ip = hostIpItem[0].trim();
+ String port = hostIpItem[1].trim();
+ transportAddresses[i] = new TransportAddress(InetAddress.getByName(ip), PORT);
+ }
+ }
+ return transportAddresses;
+ }
+}
diff --git a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java
index d015a07..e3928c7 100644
--- a/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java
+++ b/test/plugin/scenarios/elasticsearch-6.x-scenario/src/main/java/org/apache/skywalking/apm/testcase/elasticsearch/controller/CaseController.java
@@ -18,44 +18,15 @@
package org.apache.skywalking.apm.testcase.elasticsearch.controller;
-import java.io.IOException;
-import java.util.Map;
-import java.util.UUID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.get.GetRequest;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.action.support.master.AcknowledgedResponse;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.script.Script;
-import org.elasticsearch.script.ScriptType;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.apache.skywalking.apm.testcase.elasticsearch.RestHighLevelClientCase;
+import org.apache.skywalking.apm.testcase.elasticsearch.TransportClientCase;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-import static java.util.Collections.singletonMap;
-
@RestController
@RequestMapping("/elasticsearch-case/case")
public class CaseController {
@@ -63,153 +34,24 @@ public class CaseController {
private static Logger logger = LogManager.getLogger(CaseController.class);
@Autowired
- private RestHighLevelClient client;
+ private RestHighLevelClientCase restHighLevelClientCase;
+
+ @Autowired
+ private TransportClientCase transportClientCase;
@GetMapping("/healthcheck")
public String healthcheck() throws Exception {
- ClusterHealthRequest request = new ClusterHealthRequest();
- request.timeout(TimeValue.timeValueSeconds(10));
- request.waitForStatus(ClusterHealthStatus.GREEN);
-
- ClusterHealthResponse response = client.cluster().health(request, RequestOptions.DEFAULT);
- if (response.isTimedOut()) {
- String message = "elastic search node start fail!";
- logger.error(message);
- throw new RuntimeException(message);
- }
+ restHighLevelClientCase.healthcheck();
return "Success";
}
@GetMapping("/elasticsearch")
public String elasticsearch() throws Exception {
- String indexName = UUID.randomUUID().toString();
- try {
- //create
- createIndex(client, indexName);
- // index
- index(client, indexName);
-
- client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
+ restHighLevelClientCase.elasticsearch();
+ transportClientCase.elasticsearch();
- //get
- get(client, indexName);
- // search
- search(client, indexName);
- // update
- update(client, indexName);
- // delete
- delete(client, indexName);
- } finally {
- if (null != client) {
- client.close();
- }
- }
return "Success";
}
- private void createIndex(RestHighLevelClient client, String indexName) throws IOException {
- CreateIndexRequest request = new CreateIndexRequest(indexName);
-
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- {
- builder.startObject("properties");
- {
- builder.startObject("author");
- {
- builder.field("type", "keyword");
- }
- builder.endObject();
- builder.startObject("title");
- {
- builder.field("type", "keyword");
- }
- builder.endObject();
- }
- builder.endObject();
- }
- builder.endObject();
- request.mapping(builder);
-
- request.settings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0));
-
- CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
- if (createIndexResponse.isAcknowledged() == false) {
- String message = "elasticsearch create index fail.";
- logger.error(message);
- throw new RuntimeException(message);
- }
- }
-
- private void index(RestHighLevelClient client, String indexName) throws IOException {
- XContentBuilder builder = XContentFactory.jsonBuilder();
- builder.startObject();
- {
- builder.field("author", "Marker");
- builder.field("title", "Java programing.");
- }
- builder.endObject();
- IndexRequest indexRequest = new IndexRequest(indexName, "_doc", "1").source(builder);
-
- IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
- if (indexResponse.status().getStatus() >= 400) {
- String message = "elasticsearch index data fail.";
- logger.error(message);
- throw new RuntimeException(message);
- }
- }
-
- private void get(RestHighLevelClient client, String indexName) throws IOException {
- GetRequest getRequest = new GetRequest(indexName, "_doc", "1");
- GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
-
- if (!getResponse.isExists()) {
- String message = "elasticsearch get data fail.";
- logger.error(message);
- throw new RuntimeException(message);
- }
- }
-
- private void update(RestHighLevelClient client, String indexName) throws IOException {
- UpdateRequest request = new UpdateRequest(indexName, "_doc", "1");
- Map<String, Object> parameters = singletonMap("title", "c++ programing.");
- Script inline = new Script(ScriptType.INLINE, "painless", "ctx._source.title = params.title", parameters);
- request.script(inline);
-
- UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
- if (updateResponse.getVersion() != 2) {
- String message = "elasticsearch update data fail.";
- logger.error(message);
- throw new RuntimeException(message);
- }
- }
-
- private void delete(RestHighLevelClient client, String indexName) throws IOException {
- DeleteIndexRequest request = new DeleteIndexRequest(indexName);
- AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
- if (!deleteIndexResponse.isAcknowledged()) {
- String message = "elasticsearch delete index fail.";
- logger.error(message);
- throw new RuntimeException(message);
- }
- }
-
- private void search(RestHighLevelClient client, String indexName) throws IOException {
-
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
- sourceBuilder.query(QueryBuilders.termQuery("author", "Marker"));
- sourceBuilder.from(0);
- sourceBuilder.size(10);
-
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.indices(indexName);
- searchRequest.source(sourceBuilder);
- SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
- if (!(searchResponse.getHits().totalHits > 0)) {
- String message = "elasticsearch search data fail.";
- logger.error(message);
- throw new RuntimeException(message);
- }
- }
}