You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/03/26 02:05:33 UTC
[incubator-skywalking] branch master updated: [Collector] fix the
collector cannot started (#987)
This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 5624417 [Collector] fix the collector cannot started (#987)
5624417 is described below
commit 5624417c076bc58b4c3f6374dd91f0f15f640f0a
Author: Xin,Zhang <zh...@apache.org>
AuthorDate: Mon Mar 26 10:05:29 2018 +0800
[Collector] fix the collector cannot started (#987)
---
.../client/elasticsearch/ElasticSearchClient.java | 68 ++++++++++++++++++----
.../ElasticSearchClientNotReadyException.java | 29 +++++++++
.../storage/es/StorageModuleEsProvider.java | 6 +-
3 files changed, 89 insertions(+), 14 deletions(-)
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index d58b955..d3a3a97 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -16,9 +16,13 @@
*
*/
-
package org.apache.skywalking.apm.collector.client.elasticsearch;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
@@ -43,12 +47,6 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.function.Consumer;
-
/**
* @author peng-yongsheng
*/
@@ -58,16 +56,17 @@ public class ElasticSearchClient implements Client {
private org.elasticsearch.client.Client client;
- private final String namespace;
-
private final String clusterName;
private final Boolean clusterTransportSniffer;
private final String clusterNodes;
- public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
- this.namespace = namespace;
+ private boolean ready = false;
+ private String namespace;
+
+ public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer,
+ String clusterNodes) {
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
@@ -90,6 +89,8 @@ public class ElasticSearchClient implements Client {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
+
+ this.ready = true;
}
@Override
@@ -110,6 +111,14 @@ public class ElasticSearchClient implements Client {
return pairsList;
}
+ public void setNamespace(String namespace) throws ElasticSearchClientException {
+ if (!ready) {
+ this.namespace = namespace;
+ } else {
+ throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready.");
+ }
+ }
+
class AddressPairs {
private String host;
private Integer port;
@@ -121,6 +130,10 @@ public class ElasticSearchClient implements Client {
}
public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
IndicesAdminClient adminClient = client.admin().indices();
indexName = formatIndexName(indexName);
CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
@@ -129,6 +142,10 @@ public class ElasticSearchClient implements Client {
}
public boolean deleteIndex(String indexName) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
@@ -137,6 +154,10 @@ public class ElasticSearchClient implements Client {
}
public boolean isExistsIndex(String indexName) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
@@ -144,31 +165,55 @@ public class ElasticSearchClient implements Client {
}
public SearchRequestBuilder prepareSearch(String indexName) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName, String id) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
return client.prepareIndex(indexName, "type", id);
}
public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
return client.prepareUpdate(indexName, "type", id);
}
public GetRequestBuilder prepareGet(String indexName, String id) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
return client.prepareGet(indexName, "type", id);
}
public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
indexName = formatIndexName(indexName);
return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
}
public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
+ if (!ready) {
+ throw new ElasticSearchClientNotReadyException();
+ }
+
MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
rowHandler.setPrepareMultiGet(prepareMultiGet);
rowHandler.setNamespace(namespace);
@@ -202,7 +247,6 @@ public class ElasticSearchClient implements Client {
}
}
-
public BulkRequestBuilder prepareBulk() {
return client.prepareBulk();
}
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java
new file mode 100644
index 0000000..7be7c53
--- /dev/null
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.client.elasticsearch;
+
+/**
+ * @author zhang xin
+ */
+public class ElasticSearchClientNotReadyException extends RuntimeException {
+ public ElasticSearchClientNotReadyException() {
+ super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient.");
+ }
+
+}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index ff52285..fb371ae 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -180,8 +180,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
String clusterName = config.getProperty(CLUSTER_NAME);
Boolean clusterTransportSniffer = (Boolean) config.get(CLUSTER_TRANSPORT_SNIFFER);
String clusterNodes = config.getProperty(CLUSTER_NODES);
- String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
- elasticSearchClient = new ElasticSearchClient(namespace, clusterName, clusterTransportSniffer, clusterNodes);
+ elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
@@ -196,6 +195,9 @@ public class StorageModuleEsProvider extends ModuleProvider {
Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER);
Integer indexReplicasNumber = (Integer) config.get(INDEX_REPLICAS_NUMBER);
try {
+ String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
+ elasticSearchClient.setNamespace(namespace);
+
elasticSearchClient.initialize();
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);
--
To stop receiving notification emails like this one, please contact
wusheng@apache.org.