You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by GitBox <gi...@apache.org> on 2018/04/22 08:14:29 UTC

[GitHub] wu-sheng closed pull request #1101: Fixed #1090

wu-sheng closed pull request #1101: Fixed #1090
URL: https://github.com/apache/incubator-skywalking/pull/1101
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java b/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java
index ff9dd5a24..83f3b6972 100644
--- a/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java
+++ b/apm-collector/apm-collector-boot/src/main/java/org/apache/skywalking/apm/collector/boot/CollectorBootStartUp.java
@@ -24,6 +24,7 @@
 import org.apache.skywalking.apm.collector.core.module.ModuleConfigException;
 import org.apache.skywalking.apm.collector.core.module.ModuleManager;
 import org.apache.skywalking.apm.collector.core.module.ModuleNotFoundException;
+import org.apache.skywalking.apm.collector.core.module.ModuleStartException;
 import org.apache.skywalking.apm.collector.core.module.ProviderNotFoundException;
 import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
 import org.slf4j.Logger;
@@ -42,7 +43,7 @@ public static void main(String[] args) {
         try {
             ApplicationConfiguration applicationConfiguration = configLoader.load();
             manager.init(applicationConfiguration);
-        } catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException e) {
+        } catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException | ModuleStartException e) {
             logger.error(e.getMessage(), e);
             System.exit(1);
         }
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
index b41b7c8d2..91bc98117 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClient.java
@@ -25,10 +25,13 @@
 import java.util.function.Consumer;
 import org.apache.skywalking.apm.collector.client.Client;
 import org.apache.skywalking.apm.collector.client.ClientException;
+import org.apache.skywalking.apm.collector.core.data.CommonTable;
+import org.apache.skywalking.apm.collector.core.util.Const;
 import org.apache.skywalking.apm.collector.core.util.StringUtils;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.MultiGetRequestBuilder;
@@ -62,14 +65,22 @@
 
     private final String clusterNodes;
 
-    private boolean ready = false;
-    private String namespace;
+    private final String namespace;
 
     public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer,
         String clusterNodes) {
         this.clusterName = clusterName;
         this.clusterTransportSniffer = clusterTransportSniffer;
         this.clusterNodes = clusterNodes;
+        this.namespace = Const.EMPTY_STRING;
+    }
+
+    public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer,
+        String clusterNodes, String namespace) {
+        this.clusterName = clusterName;
+        this.clusterTransportSniffer = clusterTransportSniffer;
+        this.clusterNodes = clusterNodes;
+        this.namespace = namespace;
     }
 
     @Override
@@ -89,8 +100,6 @@ public void initialize() throws ClientException {
                 throw new ElasticSearchClientException(e.getMessage(), e);
             }
         }
-
-        this.ready = true;
     }
 
     @Override
@@ -111,14 +120,6 @@ public void shutdown() {
         return pairsList;
     }
 
-    public void setNamespace(String namespace) throws ElasticSearchClientException {
-        if (!ready) {
-            this.namespace = namespace;
-        } else {
-            throw new ElasticSearchClientException("The namespace cannot be set after ElasticSearchClient is ready.");
-        }
-    }
-
     class AddressPairs {
         private String host;
         private Integer port;
@@ -130,10 +131,6 @@ public void setNamespace(String namespace) throws ElasticSearchClientException {
     }
 
     public boolean createIndex(String indexName, String indexType, Settings settings, XContentBuilder mappingBuilder) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         IndicesAdminClient adminClient = client.admin().indices();
         indexName = formatIndexName(indexName);
         CreateIndexResponse response = adminClient.prepareCreate(indexName).setSettings(settings).addMapping(indexType, mappingBuilder).get();
@@ -142,10 +139,6 @@ public boolean createIndex(String indexName, String indexType, Settings settings
     }
 
     public boolean deleteIndex(String indexName) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         indexName = formatIndexName(indexName);
         IndicesAdminClient adminClient = client.admin().indices();
         DeleteIndexResponse response = adminClient.prepareDelete(indexName).get();
@@ -154,10 +147,6 @@ public boolean deleteIndex(String indexName) {
     }
 
     public boolean isExistsIndex(String indexName) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         indexName = formatIndexName(indexName);
         IndicesAdminClient adminClient = client.admin().indices();
         IndicesExistsResponse response = adminClient.prepareExists(indexName).get();
@@ -165,60 +154,42 @@ public boolean isExistsIndex(String indexName) {
     }
 
     public SearchRequestBuilder prepareSearch(String indexName) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         indexName = formatIndexName(indexName);
         return client.prepareSearch(indexName);
     }
 
     public IndexRequestBuilder prepareIndex(String indexName, String id) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
+        indexName = formatIndexName(indexName);
+        return client.prepareIndex(indexName, CommonTable.TABLE_TYPE, id);
+    }
 
+    public GetFieldMappingsResponse.FieldMappingMetaData prepareGetMappings(String indexName, String fieldName) {
         indexName = formatIndexName(indexName);
-        return client.prepareIndex(indexName, "type", id);
+        GetFieldMappingsResponse response = client.admin().indices().prepareGetFieldMappings(indexName).setFields(fieldName).get();
+        return response.fieldMappings(indexName, CommonTable.TABLE_TYPE, fieldName);
     }
 
     public UpdateRequestBuilder prepareUpdate(String indexName, String id) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         indexName = formatIndexName(indexName);
-        return client.prepareUpdate(indexName, "type", id);
+        return client.prepareUpdate(indexName, CommonTable.TABLE_TYPE, id);
     }
 
     public GetRequestBuilder prepareGet(String indexName, String id) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         indexName = formatIndexName(indexName);
-        return client.prepareGet(indexName, "type", id);
+        return client.prepareGet(indexName, CommonTable.TABLE_TYPE, id);
     }
 
     public DeleteByQueryRequestBuilder prepareDelete(QueryBuilder queryBuilder, String indexName) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         indexName = formatIndexName(indexName);
         return DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(indexName);
     }
 
     public MultiGetRequestBuilder prepareMultiGet(List<?> rows, MultiGetRowHandler rowHandler) {
-        if (!ready) {
-            throw new ElasticSearchClientNotReadyException();
-        }
-
         MultiGetRequestBuilder prepareMultiGet = client.prepareMultiGet();
         rowHandler.setPrepareMultiGet(prepareMultiGet);
         rowHandler.setNamespace(namespace);
 
-        rows.forEach(row -> rowHandler.accept(row));
+        rows.forEach(rowHandler::accept);
 
         return rowHandler.getPrepareMultiGet();
     }
@@ -255,7 +226,7 @@ private String formatIndexName(String indexName) {
 
     private static String formatIndexName(String namespace, String indexName) {
         if (StringUtils.isNotEmpty(namespace)) {
-            return namespace + "_" + indexName;
+            return namespace + Const.ID_SPLIT + indexName;
         }
         return indexName;
     }
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java
index 9691839c4..5568d733c 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java
+++ b/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientException.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.client.elasticsearch;
 
 import org.apache.skywalking.apm.collector.client.ClientException;
@@ -24,12 +23,8 @@
 /**
  * @author peng-yongsheng
  */
-public class ElasticSearchClientException extends ClientException {
-    public ElasticSearchClientException(String message) {
-        super(message);
-    }
-
-    public ElasticSearchClientException(String message, Throwable cause) {
+class ElasticSearchClientException extends ClientException {
+    ElasticSearchClientException(String message, Throwable cause) {
         super(message, cause);
     }
 }
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java
index 8a3b91d90..6dc69de39 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/BootstrapFlow.java
@@ -43,7 +43,8 @@
     }
 
     @SuppressWarnings("unchecked")
-    void start(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException {
+    void start(
+        ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
         for (ModuleProvider provider : startupSequence) {
             String[] requiredModules = provider.requiredModules();
             if (requiredModules != null) {
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java
index b09b42a0d..21df5a836 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleManager.java
@@ -37,7 +37,7 @@
      * Init the given modules
      */
     public void init(
-        ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException {
+        ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
         String[] moduleNames = applicationConfiguration.moduleList();
         ServiceLoader<Module> moduleServiceLoader = ServiceLoader.load(Module.class);
         LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
diff --git a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java
index c16ad3530..5e2b8853d 100644
--- a/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleProvider.java
@@ -71,7 +71,7 @@ protected final ModuleManager getManager() {
     /**
      * In start stage, the module has been ready for interop.
      */
-    public abstract void start() throws ServiceNotProvidedException;
+    public abstract void start() throws ServiceNotProvidedException, ModuleStartException;
 
     /**
      * This callback executes after all modules start up successfully.
diff --git a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java
similarity index 69%
rename from apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java
rename to apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java
index 7be7c53c6..725327286 100644
--- a/apm-collector/apm-collector-component/client-component/src/main/java/org/apache/skywalking/apm/collector/client/elasticsearch/ElasticSearchClientNotReadyException.java
+++ b/apm-collector/apm-collector-core/src/main/java/org/apache/skywalking/apm/collector/core/module/ModuleStartException.java
@@ -16,14 +16,14 @@
  *
  */
 
-package org.apache.skywalking.apm.collector.client.elasticsearch;
+package org.apache.skywalking.apm.collector.core.module;
 
 /**
- * @author zhang xin
+ * @author peng-yongsheng
  */
-public class ElasticSearchClientNotReadyException extends RuntimeException {
-    public ElasticSearchClientNotReadyException() {
-        super("ElasticSearchClient not complete the initialization, Please call initializeFinished method before operation ElasticSearchClient.");
-    }
+public class ModuleStartException extends Exception {
 
+    public ModuleStartException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java b/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java
index 7b2f32b6c..a00feb7bf 100644
--- a/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java
+++ b/apm-collector/apm-collector-core/src/test/java/org/apache/skywalking/apm/collector/core/module/ModuleManagerTest.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.core.module;
 
 import java.util.Properties;
@@ -28,7 +27,7 @@
  */
 public class ModuleManagerTest {
     @Test
-    public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException {
+    public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException, ModuleStartException {
         ApplicationConfiguration configuration = new ApplicationConfiguration();
         configuration.addModule("Test").addProviderConfiguration("TestModule-Provider", new Properties());
         configuration.addModule("BaseA").addProviderConfiguration("P-A", new Properties());
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java
index 1671769a6..4f99bfed7 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstallException.java
@@ -16,7 +16,6 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.storage;
 
 /**
diff --git a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java
index 675eb7ec4..3ad986d12 100644
--- a/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java
+++ b/apm-collector/apm-collector-storage/collector-storage-define/src/main/java/org/apache/skywalking/apm/collector/storage/StorageInstaller.java
@@ -16,14 +16,13 @@
  *
  */
 
-
 package org.apache.skywalking.apm.collector.storage;
 
 import java.util.List;
-import org.apache.skywalking.apm.collector.core.data.StorageDefineLoader;
 import org.apache.skywalking.apm.collector.client.Client;
-import org.apache.skywalking.apm.collector.core.define.DefineException;
+import org.apache.skywalking.apm.collector.core.data.StorageDefineLoader;
 import org.apache.skywalking.apm.collector.core.data.TableDefine;
+import org.apache.skywalking.apm.collector.core.define.DefineException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,6 +33,12 @@
 
     private final Logger logger = LoggerFactory.getLogger(StorageInstaller.class);
 
+    private final boolean isHighPerformanceMode;
+
+    public StorageInstaller(boolean isHighPerformanceMode) {
+        this.isHighPerformanceMode = isHighPerformanceMode;
+    }
+
     public final void install(Client client) throws StorageException {
         StorageDefineLoader defineLoader = new StorageDefineLoader();
         try {
@@ -43,6 +48,7 @@ public final void install(Client client) throws StorageException {
 
             for (TableDefine tableDefine : tableDefines) {
                 tableDefine.initialize();
+                settingHighPerformance(tableDefine);
                 if (!isExists(client, tableDefine)) {
                     logger.info("table: {} not exists", tableDefine.getName());
                     createTable(client, tableDefine);
@@ -51,17 +57,28 @@ public final void install(Client client) throws StorageException {
                     deleteTable(client, tableDefine);
                     createTable(client, tableDefine);
                 }
+                columnCheck(client, tableDefine);
             }
         } catch (DefineException e) {
             throw new StorageInstallException(e.getMessage(), e);
         }
     }
 
+    private void settingHighPerformance(TableDefine tableDefine) {
+        tableDefine.getColumnDefines().forEach(column -> {
+            if (isHighPerformanceMode) {
+                column.getColumnName().useShortName();
+            }
+        });
+    }
+
     protected abstract void defineFilter(List<TableDefine> tableDefines);
 
     protected abstract boolean isExists(Client client, TableDefine tableDefine) throws StorageException;
 
-    protected abstract boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException;
+    protected abstract void columnCheck(Client client, TableDefine tableDefine) throws StorageException;
+
+    protected abstract void deleteTable(Client client, TableDefine tableDefine) throws StorageException;
 
-    protected abstract boolean createTable(Client client, TableDefine tableDefine) throws StorageException;
+    protected abstract void createTable(Client client, TableDefine tableDefine) throws StorageException;
 }
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
index 12d610af6..caf11ebb0 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/StorageModuleEsProvider.java
@@ -29,6 +29,7 @@
 import org.apache.skywalking.apm.collector.core.module.Module;
 import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
 import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
+import org.apache.skywalking.apm.collector.core.module.ModuleStartException;
 import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
 import org.apache.skywalking.apm.collector.remote.RemoteModule;
 import org.apache.skywalking.apm.collector.storage.StorageException;
@@ -242,16 +243,12 @@
 import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceMetricEsUIDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceNameServiceEsUIDAO;
 import org.apache.skywalking.apm.collector.storage.es.dao.ui.ServiceReferenceEsMetricUIDAO;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
  */
 public class StorageModuleEsProvider extends ModuleProvider {
 
-    private static final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
-
     static final String NAME = "elasticsearch";
     private final StorageModuleEsConfig config;
     private ElasticSearchClient elasticSearchClient;
@@ -275,8 +272,6 @@ public StorageModuleEsProvider() {
     }
 
     @Override public void prepare() throws ServiceNotProvidedException {
-        elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes());
-
         this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
         registerCacheDAO();
         registerRegisterDAO();
@@ -286,17 +281,16 @@ public StorageModuleEsProvider() {
     }
 
     @Override
-    public void start() {
+    public void start() throws ModuleStartException {
         try {
             String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
-            elasticSearchClient.setNamespace(namespace);
-
+            elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), namespace);
             elasticSearchClient.initialize();
 
             ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(config.getIndexShardsNumber(), config.getIndexReplicasNumber(), config.isHighPerformanceMode());
             installer.install(elasticSearchClient);
         } catch (ClientException | StorageException e) {
-            logger.error(e.getMessage(), e);
+            throw new ModuleStartException(e.getMessage(), e);
         }
 
         String uuId = UUID.randomUUID().toString();
diff --git a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java
index 66510bf99..b7a21c1ee 100644
--- a/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java
+++ b/apm-collector/apm-collector-storage/collector-storage-es-provider/src/main/java/org/apache/skywalking/apm/collector/storage/es/base/define/ElasticSearchStorageInstaller.java
@@ -20,15 +20,19 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
 import org.apache.skywalking.apm.collector.client.Client;
 import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
 import org.apache.skywalking.apm.collector.core.data.ColumnDefine;
 import org.apache.skywalking.apm.collector.core.data.TableDefine;
+import org.apache.skywalking.apm.collector.storage.StorageException;
+import org.apache.skywalking.apm.collector.storage.StorageInstallException;
 import org.apache.skywalking.apm.collector.storage.StorageInstaller;
+import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.common.xcontent.XContentFactory;
-import org.elasticsearch.index.IndexNotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,13 +45,12 @@
 
     private final int indexShardsNumber;
     private final int indexReplicasNumber;
-    private final boolean isHighPerformanceMode;
 
     public ElasticSearchStorageInstaller(int indexShardsNumber, int indexReplicasNumber,
         boolean isHighPerformanceMode) {
+        super(isHighPerformanceMode);
         this.indexShardsNumber = indexShardsNumber;
         this.indexReplicasNumber = indexReplicasNumber;
-        this.isHighPerformanceMode = isHighPerformanceMode;
     }
 
     @Override protected void defineFilter(List<TableDefine> tableDefines) {
@@ -59,7 +62,26 @@ public ElasticSearchStorageInstaller(int indexShardsNumber, int indexReplicasNum
         }
     }
 
-    @Override protected boolean createTable(Client client, TableDefine tableDefine) {
+    @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
+        ElasticSearchClient esClient = (ElasticSearchClient)client;
+        ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
+
+        for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
+            GetFieldMappingsResponse.FieldMappingMetaData metaData = esClient.prepareGetMappings(esTableDefine.getName(), columnDefine.getColumnName().getName());
+
+            if (Objects.nonNull(metaData)) {
+                Map field = (Map)metaData.sourceAsMap().get(columnDefine.getColumnName().getName());
+                if (!columnDefine.getType().toLowerCase().equals(field.get("type"))) {
+                    throw new StorageInstallException("Field named " + columnDefine.getColumnName().getName() + "'s type not match the definition. Expect: "
+                        + columnDefine.getType().toLowerCase() + ", actual: " + field.get("type"));
+                }
+            } else {
+                throw new StorageInstallException("Field named " + columnDefine.getColumnName().getName() + " in " + tableDefine.getName() + " index not found.");
+            }
+        }
+    }
+
+    @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
         ElasticSearchClient esClient = (ElasticSearchClient)client;
         ElasticSearchTableDefine esTableDefine = (ElasticSearchTableDefine)tableDefine;
 
@@ -76,7 +98,10 @@ public ElasticSearchStorageInstaller(int indexShardsNumber, int indexReplicasNum
 
         boolean isAcknowledged = esClient.createIndex(esTableDefine.getName(), esTableDefine.type(), settings, mappingBuilder);
         logger.info("create {} index with type of {} finished, isAcknowledged: {}", esTableDefine.getName(), esTableDefine.type(), isAcknowledged);
-        return isAcknowledged;
+
+        if (!isAcknowledged) {
+            throw new StorageInstallException("create " + esTableDefine.getName() + " index failure, ");
+        }
     }
 
     private Settings createSettingBuilder(ElasticSearchTableDefine tableDefine) {
@@ -96,9 +121,6 @@ private XContentBuilder createMappingBuilder(ElasticSearchTableDefine tableDefin
 
         for (ColumnDefine columnDefine : tableDefine.getColumnDefines()) {
             ElasticSearchColumnDefine elasticSearchColumnDefine = (ElasticSearchColumnDefine)columnDefine;
-            if (isHighPerformanceMode) {
-                elasticSearchColumnDefine.getColumnName().useShortName();
-            }
 
             if (ElasticSearchColumnDefine.Type.Text.name().toLowerCase().equals(elasticSearchColumnDefine.getType().toLowerCase())) {
                 mappingBuilder
@@ -121,14 +143,12 @@ private XContentBuilder createMappingBuilder(ElasticSearchTableDefine tableDefin
         return mappingBuilder;
     }
 
-    @Override protected boolean deleteTable(Client client, TableDefine tableDefine) {
+    @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
         ElasticSearchClient esClient = (ElasticSearchClient)client;
-        try {
-            return esClient.deleteIndex(tableDefine.getName());
-        } catch (IndexNotFoundException e) {
-            logger.info("{} index not found", tableDefine.getName());
+
+        if (!esClient.deleteIndex(tableDefine.getName())) {
+            throw new StorageInstallException(tableDefine.getName() + " index delete failure.");
         }
-        return false;
     }
 
     @Override protected boolean isExists(Client client, TableDefine tableDefine) {
diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java
index fa5b91279..63d7e0901 100644
--- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java
+++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/StorageModuleH2Provider.java
@@ -280,7 +280,7 @@ public StorageModuleH2Provider() {
         try {
             h2Client.initialize();
 
-            H2StorageInstaller installer = new H2StorageInstaller();
+            H2StorageInstaller installer = new H2StorageInstaller(false);
             installer.install(h2Client);
         } catch (H2ClientException | StorageException e) {
             logger.error(e.getMessage(), e);
diff --git a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java
index 21adca86a..1e8fcd402 100644
--- a/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java
+++ b/apm-collector/apm-collector-storage/collector-storage-h2-provider/src/main/java/org/apache/skywalking/apm/collector/storage/h2/base/define/H2StorageInstaller.java
@@ -38,6 +38,10 @@
 
     private final Logger logger = LoggerFactory.getLogger(H2StorageInstaller.class);
 
+    public H2StorageInstaller(boolean isHighPerformanceMode) {
+        super(isHighPerformanceMode);
+    }
+
     @Override protected void defineFilter(List<TableDefine> tableDefines) {
         int size = tableDefines.size();
         for (int i = size - 1; i >= 0; i--) {
@@ -70,17 +74,20 @@
         return false;
     }
 
-    @Override protected boolean deleteTable(Client client, TableDefine tableDefine) throws StorageException {
+    @Override protected void columnCheck(Client client, TableDefine tableDefine) throws StorageException {
+
+    }
+
+    @Override protected void deleteTable(Client client, TableDefine tableDefine) throws StorageException {
         H2Client h2Client = (H2Client)client;
         try {
             h2Client.execute("drop table if exists " + tableDefine.getName());
-            return true;
         } catch (H2ClientException e) {
             throw new StorageInstallException(e.getMessage(), e);
         }
     }
 
-    @Override protected boolean createTable(Client client, TableDefine tableDefine) throws StorageException {
+    @Override protected void createTable(Client client, TableDefine tableDefine) throws StorageException {
         H2Client h2Client = (H2Client)client;
         H2TableDefine h2TableDefine = (H2TableDefine)tableDefine;
 
@@ -104,6 +111,5 @@
         } catch (H2ClientException e) {
             throw new StorageInstallException(e.getMessage(), e);
         }
-        return true;
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services