You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by wu...@apache.org on 2018/03/26 02:05:33 UTC

[incubator-skywalking] branch master updated: [Collector] fix the collector cannot started (#987)

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 5624417  [Collector] fix the collector cannot started (#987)
5624417 is described below

commit 5624417c076bc58b4c3f6374dd91f0f15f640f0a
Author: Xin,Zhang <zh...@apache.org>
AuthorDate: Mon Mar 26 10:05:29 2018 +0800

    [Collector] fix the collector cannot started (#987)
---
 .../client/elasticsearch/ElasticSearchClient.java  | 68 ++++++++++++++++++----
 .../ElasticSearchClientNotReadyException.java      | 29 +++++++++
 .../storage/es/StorageModuleEsProvider.java        |  6 +-
 3 files changed, 89 insertions(+), 14 deletions(-)

diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index d58b955..d3a3a97 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -16,9 +16,13 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.client.elasticsearch;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Consumer;
 import org.apache.skywalking.apm.collector.client.Client;
 import org.apache.skywalking.apm.collector.client.ClientException;
 import org.apache.skywalking.apm.collector.core.util.StringUtils;
@@ -43,12 +47,6 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.function.Consumer;
-
 /**
  * @author peng-yongsheng
  */
@@ -58,16 +56,17 @@ public class ElasticSearchClient implements Client {
 
     private org.elasticsearch.client.Client client;
 
-    private final String namespace;
-
     private final String clusterName;
 
     private final Boolean clusterTransportSniffer;
 
     private final String clusterNodes;
 
-    public ElasticSearchClient(String namespace, String clusterName, Boolean clusterTransportSniffer, String clusterNodes) {
-        this.namespace = namespace;
+    private boolean ready = false;
+    private String namespace;
+
+    public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer,
+        String clusterNodes) {
         this.clusterName = clusterName;
         this.clusterTransportSniffer = clusterTransportSniffer;
         this.clusterNodes = clusterNodes;
@@ -90,6 +89,8 @@ public class ElasticSearchClient implements Client {
                 throw new ElasticSearchClientException(e.getMessage(), e);
             }
         }
+
+        this.ready = true;
     }
 
     @Override
@@ -110,6 +111,14 @@ public class ElasticSearchClient implements Client {
         return pairsList;
     }
 
+    public void setNamespace(String namespace) throws ElasticSearchClientException {
+        if (!ready) {
+            this.namespace = namespace;
+        } else {
+            throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready.");
+        }
+    }
+
     class AddressPairs {
         private String host;
         private Integer port;
@@ -121,6 +130,10 @@ public class ElasticSearchClient implements Client {
     }
 
     public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         IndicesAdminClient adminClient = client.admin().indices();
         indexName = formatIndexName(indexName);
         CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
@@ -129,6 +142,10 @@ public class ElasticSearchClient implements Client {
     }
 
     public boolean deleteIndex(String indexName) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         IndicesAdminClient adminClient = client.admin().indices();
         DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
@@ -137,6 +154,10 @@ public class ElasticSearchClient implements Client {
     }
 
     public boolean isExistsIndex(String indexName) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         IndicesAdminClient adminClient = client.admin().indices();
         IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
@@ -144,31 +165,55 @@ public class ElasticSearchClient implements Client {
     }
 
     public SearchRequestBuilder prepareSearch(String indexName) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         return client.prepareSearch(indexName);
     }
 
     public IndexRequestBuilder prepareIndex(String indexName, String id) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         return client.prepareIndex(indexName, "type", id);
     }
 
     public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         return client.prepareUpdate(indexName, "type", id);
     }
 
     public GetRequestBuilder prepareGet(String indexName, String id) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         return client.prepareGet(indexName, "type", id);
     }
 
     public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         indexName = formatIndexName(indexName);
         return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
     }
 
     public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
+        if (!ready) {
+            throw new ElasticSearchClientNotReadyException();
+        }
+
         MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
         rowHandler.setPrepareMultiGet(prepareMultiGet);
         rowHandler.setNamespace(namespace);
@@ -202,7 +247,6 @@ public class ElasticSearchClient implements Client {
         }
     }
 
-
     public BulkRequestBuilder prepareBulk() {
         return client.prepareBulk();
     }
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java
new file mode 100644
index 0000000..7be7c53
--- /dev/null
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.client.elasticsearch;
+
+/**
+ * @author zhang xin
+ */
+public class ElasticSearchClientNotReadyException extends RuntimeException {
+    public ElasticSearchClientNotReadyException() {
+        super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient.");
+    }
+
+}
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index ff52285..fb371ae 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -180,8 +180,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
         String clusterName = config.getProperty(CLUSTER_NAME);
         Boolean clusterTransportSniffer = (Boolean) config.get(CLUSTER_TRANSPORT_SNIFFER);
         String clusterNodes = config.getProperty(CLUSTER_NODES);
-        String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
-        elasticSearchClient = new ElasticSearchClient(namespace, clusterName, clusterTransportSniffer, clusterNodes);
+        elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
 
         this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
         registerCacheDAO();
@@ -196,6 +195,9 @@ public class StorageModuleEsProvider extends ModuleProvider {
         Integer indexShardsNumber = (Integer) config.get(INDEX_SHARDS_NUMBER);
         Integer indexReplicasNumber = (Integer) config.get(INDEX_REPLICAS_NUMBER);
         try {
+            String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
+            elasticSearchClient.setNamespace(namespace);
+
             elasticSearchClient.initialize();
 
             ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);

-- 
To stop receiving notification emails like this one, please contact
wusheng@apache.org.