You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2022/02/16 19:10:37 UTC

[atlas] 02/02: ATLAS-4554: updated monitoring to support elasticsearch index backend

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 57880d33428ba516148f41addb9e9cd7a9ec36e5
Author: xingyu_zha <xi...@intsig.net>
AuthorDate: Mon Feb 14 17:56:13 2022 +0800

    ATLAS-4554: updated monitoring to support elasticsearch index backend
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 0847d6c371a4482924ff642b525c633d7e0f5e6a)
---
 .../graphdb/janus/AtlasJanusGraphDatabase.java     |   24 +
 .../graphdb/janus/AtlasJanusGraphIndexClient.java  |   27 +-
 .../diskstorage/es/ElasticSearch7Index.java        | 1299 ++++++++++++++++++++
 .../org/apache/atlas/ApplicationProperties.java    |    7 +
 .../repository/graph/IndexRecoveryService.java     |    8 +-
 5 files changed, 1355 insertions(+), 10 deletions(-)

diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 3995cf9..cb3e8d9 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -37,6 +37,7 @@ import org.janusgraph.core.JanusGraphFactory;
 import org.janusgraph.core.schema.JanusGraphManagement;
 import org.janusgraph.diskstorage.StandardIndexProvider;
 import org.janusgraph.diskstorage.StandardStoreManager;
+import org.janusgraph.diskstorage.es.ElasticSearch7Index;
 import org.janusgraph.diskstorage.solr.Solr6Index;
 import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
 import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
@@ -122,6 +123,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
         addHBase2Support();
 
         addSolr6Index();
+
+        addElasticSearch7Index();
     }
 
     private static void addHBase2Support() {
@@ -164,6 +167,27 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
         }
     }
 
+    private static void addElasticSearch7Index() {
+        try {
+            Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
+            field.setAccessible(true);
+
+            Field modifiersField = Field.class.getDeclaredField("modifiers");
+            modifiersField.setAccessible(true);
+            modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+            Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses());
+            customMap.put("elasticsearch", ElasticSearch7Index.class.getName());
+            ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
+            field.set(null, immap);
+
+            LOG.debug("Injected es7 index - {}", ElasticSearch7Index.class.getName());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
     public static JanusGraph getGraphInstance() {
         if (graphInstance == null) {
             synchronized (AtlasJanusGraphDatabase.class) {
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index 9e9fdd8..f9f3772 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.graphdb.janus;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasAggregationEntry;
 import org.apache.atlas.repository.Constants;
@@ -42,6 +43,8 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.TermsResponse;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.util.NamedList;
+import org.janusgraph.diskstorage.es.ElasticSearch7Index;
+import org.janusgraph.diskstorage.es.ElasticSearchClient;
 import org.janusgraph.diskstorage.solr.Solr6Index;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +73,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
     private static final String         TERMS_FIELD                  = "terms.fl";
     private static final int            SOLR_HEALTHY_STATUS          = 0;
     private static final long           SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min
-    private static long                 prevSolrHealthCheckTime;
+    private static       long           prevIdxHealthCheckTime;
 
 
     private final Configuration configuration;
@@ -83,21 +86,26 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
     public boolean isHealthy() {
         boolean isHealthy   = false;
         long    currentTime = System.currentTimeMillis();
+        String  idxBackEnd  = configuration.getString(ApplicationProperties.INDEX_BACKEND_CONF);
 
         try {
-            if (isSolrHealthy()) {
-                isHealthy = true;
+            if (ApplicationProperties.INDEX_BACKEND_SOLR.equals(idxBackEnd)) {
+                isHealthy = isSolrHealthy();
+            } else if (ApplicationProperties.INDEX_BACKEND_ELASTICSEARCH.equals(idxBackEnd)) {
+                isHealthy = isElasticsearchHealthy();
             }
+
+            LOG.info("indexBackEnd={}; isHealthy={}", idxBackEnd, isHealthy);
         } catch (Exception exception) {
             if (LOG.isDebugEnabled()) {
                 LOG.error("Error: isHealthy", exception);
             }
         }
 
-        if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
-            LOG.info("Solr Health: Unhealthy!");
+        if (!isHealthy && (prevIdxHealthCheckTime == 0 || currentTime - prevIdxHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
+            LOG.info("Backend Health: Unhealthy!");
 
-            prevSolrHealthCheckTime = currentTime;
+            prevIdxHealthCheckTime = currentTime;
         }
 
         return isHealthy;
@@ -379,6 +387,13 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
         return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS;
     }
 
+    private boolean isElasticsearchHealthy() throws IOException {
+        ElasticSearchClient client           = ElasticSearch7Index.getElasticSearchClient();
+        String              janusVertexIndex = ApplicationProperties.DEFAULT_INDEX_NAME + "_" + Constants.VERTEX_INDEX;
+
+        return client != null && client.indexExists(janusVertexIndex);
+    }
+
     private void graphManagementCommit(AtlasGraphManagement management) {
         try {
             management.commit();
diff --git a/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
new file mode 100644
index 0000000..51a87f5
--- /dev/null
+++ b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
@@ -0,0 +1,1299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.janusgraph.diskstorage.es;
+
+import static org.janusgraph.diskstorage.es.ElasticSearchIndex.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.janusgraph.core.Cardinality;
+import org.janusgraph.core.JanusGraphException;
+import org.janusgraph.core.attribute.Cmp;
+import org.janusgraph.core.attribute.Geo;
+import org.janusgraph.core.attribute.Geoshape;
+import org.janusgraph.core.attribute.Text;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.core.schema.Parameter;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.BaseTransaction;
+import org.janusgraph.diskstorage.BaseTransactionConfig;
+import org.janusgraph.diskstorage.BaseTransactionConfigurable;
+import org.janusgraph.diskstorage.PermanentBackendException;
+import org.janusgraph.diskstorage.TemporaryBackendException;
+import org.janusgraph.diskstorage.configuration.ConfigOption;
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.janusgraph.diskstorage.es.compat.AbstractESCompat;
+import org.janusgraph.diskstorage.es.compat.ESCompatUtils;
+import org.janusgraph.diskstorage.es.mapping.IndexMapping;
+import org.janusgraph.diskstorage.es.script.ESScriptResponse;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.diskstorage.indexing.IndexFeatures;
+import org.janusgraph.diskstorage.indexing.IndexMutation;
+import org.janusgraph.diskstorage.indexing.IndexProvider;
+import org.janusgraph.diskstorage.indexing.IndexQuery;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.janusgraph.diskstorage.indexing.RawQuery;
+import org.janusgraph.diskstorage.util.DefaultTransaction;
+import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
+import org.janusgraph.graphdb.database.serialize.AttributeUtils;
+import org.janusgraph.graphdb.query.JanusGraphPredicate;
+import org.janusgraph.graphdb.query.condition.And;
+import org.janusgraph.graphdb.query.condition.Condition;
+import org.janusgraph.graphdb.query.condition.Not;
+import org.janusgraph.graphdb.query.condition.Or;
+import org.janusgraph.graphdb.query.condition.PredicateCondition;
+import org.janusgraph.graphdb.types.ParameterType;
+import org.locationtech.spatial4j.shape.Rectangle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_DOC_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_GEO_COORDS_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_LANG_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_SCRIPT_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_TYPE_KEY;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME;
+
+/**
+ * Do not change
+ * This is a copy of ElasticSearchIndex.java from org.janusgraph.diskstorage.es
+ * Added a new method to new client instance
+ */
+
+@PreInitializeConfigOptions
+public class ElasticSearch7Index implements IndexProvider {
+
+    private static final Logger log = LoggerFactory.getLogger(ElasticSearchIndex.class);
+
+    // add elasticsearch index instance(Singleton Pattern)
+    private static ElasticSearch7Index instance = null;
+
+    private static final String STRING_MAPPING_SUFFIX = "__STRING";
+    // add if need new instance
+    public static final ConfigOption<Boolean> CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST = new ConfigOption(ELASTICSEARCH_NS, "create-client-per-request", "when false, allows the sharing of es client across other components.", org.janusgraph.diskstorage.configuration.ConfigOption.Type.LOCAL, false);
+
+    private static final String PARAMETERIZED_DELETION_SCRIPT = parameterizedScriptPrepare("",
+            "for (field in params.fields) {",
+            "    if (field.cardinality == 'SINGLE') {",
+            "        ctx._source.remove(field.name);",
+            "    } else if (ctx._source.containsKey(field.name)) {",
+            "        def fieldIndex = ctx._source[field.name].indexOf(field.value);",
+            "        if (fieldIndex >= 0 && fieldIndex < ctx._source[field.name].size()) {",
+            "            ctx._source[field.name].remove(fieldIndex);",
+            "        }",
+            "    }",
+            "}");
+
+    private static final String PARAMETERIZED_ADDITION_SCRIPT = parameterizedScriptPrepare("",
+            "for (field in params.fields) {",
+            "    if (ctx._source[field.name] == null) {",
+            "        ctx._source[field.name] = [];",
+            "    }",
+            "    if (field.cardinality != 'SET' || ctx._source[field.name].indexOf(field.value) == -1) {",
+            "        ctx._source[field.name].add(field.value);",
+            "    }",
+            "}");
+
+    static final String INDEX_NAME_SEPARATOR = "_";
+    private static final String SCRIPT_ID_SEPARATOR = "-";
+
+    private static final String MAX_OPEN_SCROLL_CONTEXT_PARAMETER = "search.max_open_scroll_context";
+    private static final Map<String, Object> MAX_RESULT_WINDOW = ImmutableMap.of("index.max_result_window", Integer.MAX_VALUE);
+
+    private static final Parameter[] NULL_PARAMETERS = null;
+
+    private static final String TRACK_TOTAL_HITS_PARAMETER = "track_total_hits";
+    private static final Parameter[] TRACK_TOTAL_HITS_DISABLED_PARAMETERS = new Parameter[]{new Parameter<>(TRACK_TOTAL_HITS_PARAMETER, false)};
+    private static final Map<String, Object> TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY = ImmutableMap.of(TRACK_TOTAL_HITS_PARAMETER, false);
+
+    private final Function<String, String> generateIndexStoreNameFunction = this::generateIndexStoreName;
+    private final Map<String, String> indexStoreNamesCache = new ConcurrentHashMap<>();
+    private final boolean indexStoreNameCacheEnabled;
+
+    private final AbstractESCompat compat;
+    private final ElasticSearchClient client;
+    private final Configuration configuration;
+    private final String indexName;
+    private final int batchSize;
+    private final boolean useExternalMappings;
+    private final boolean allowMappingUpdate;
+    private final Map<String, Object> indexSetting;
+    private final long createSleep;
+    private final boolean useAllField;
+    private final Map<String, Object> ingestPipelines;
+    private final boolean useMappingForES7;
+    private final String parameterizedAdditionScriptId;
+    private final String parameterizedDeletionScriptId;
+
+    private static boolean createElasticSearchClientPerRequest;
+
+    public ElasticSearch7Index(Configuration config) throws BackendException {
+        // fetch configuration
+        this.configuration = config;
+        indexName = config.get(INDEX_NAME);
+        parameterizedAdditionScriptId = generateScriptId("add");
+        parameterizedDeletionScriptId = generateScriptId("del");
+        useAllField = config.get(USE_ALL_FIELD);
+        useExternalMappings = config.get(USE_EXTERNAL_MAPPINGS);
+        allowMappingUpdate = config.get(ALLOW_MAPPING_UPDATE);
+        createSleep = config.get(CREATE_SLEEP);
+        ingestPipelines = config.getSubset(ES_INGEST_PIPELINES);
+        useMappingForES7 = config.get(USE_MAPPING_FOR_ES7);
+        indexStoreNameCacheEnabled = config.get(ENABLE_INDEX_STORE_NAMES_CACHE);
+        batchSize = config.get(INDEX_MAX_RESULT_SET_SIZE);
+        log.debug("Configured ES query nb result by query to {}", batchSize);
+
+        client = createElasticSearchClient();
+        createElasticSearchClientPerRequest = config.get(CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST);
+
+        checkClusterHealth(config.get(HEALTH_REQUEST_TIMEOUT));
+
+        compat = ESCompatUtils.acquireCompatForVersion(client.getMajorVersion());
+
+        indexSetting = ElasticSearchSetup.getSettingsFromJanusGraphConf(config);
+
+        setupMaxOpenScrollContextsIfNeeded(config);
+
+        setupStoredScripts();
+
+        //set instance
+        ElasticSearch7Index.instance = this;
+    }
+
+
+    //get client
+    public static ElasticSearchClient getElasticSearchClient() {
+        ElasticSearchClient ret     = null;
+        ElasticSearch7Index esIndex = ElasticSearch7Index.instance;
+
+        if (esIndex != null) {
+            if (createElasticSearchClientPerRequest) {
+                log.debug("Creating a new ElasticSearch Client.");
+
+                ret = esIndex.createElasticSearchClient();
+            } else {
+                log.debug("Returning the elasticSearch client owned by ElasticSearchIndex.");
+
+                ret = esIndex.client;
+            }
+        } else {
+            log.debug("No ElasticSearchIndex available. Will return null");
+        }
+
+        return ret;
+    }
+
+    //release client
+    public static void releaseElasticSearchClient(ElasticSearchClient elasticSearchClient) {
+        if(createElasticSearchClientPerRequest) {
+            if (elasticSearchClient != null) {
+                try {
+                    elasticSearchClient.close();
+
+                    if(log.isDebugEnabled()) {
+                        log.debug("Closed the elasticSearch client successfully.");
+                    }
+                } catch (IOException excp) {
+                    log.warn("Failed to close elasticSearchClient.", excp);
+                }
+            }
+        } else {
+            if(log.isDebugEnabled()) {
+                log.debug("Ignoring the closing of elasticSearch client as it is owned by ElasticSearchIndex.");
+            }
+        }
+    }
+
+    //create client
+    private ElasticSearchClient createElasticSearchClient() {
+        return interfaceConfiguration(configuration).getClient();
+    }
+
+
+
+    private void checkClusterHealth(String healthCheck) throws BackendException {
+        try {
+            client.clusterHealthRequest(healthCheck);
+        } catch (final IOException e) {
+            throw new PermanentBackendException(e.getMessage(), e);
+        }
+    }
+
+    private void setupStoredScripts() throws PermanentBackendException {
+        setupStoredScriptIfNeeded(parameterizedAdditionScriptId, PARAMETERIZED_ADDITION_SCRIPT);
+        setupStoredScriptIfNeeded(parameterizedDeletionScriptId, PARAMETERIZED_DELETION_SCRIPT);
+    }
+
+    private void setupStoredScriptIfNeeded(String storedScriptId, String source) throws PermanentBackendException {
+
+        ImmutableMap<String, Object> preparedScript = compat.prepareScript(source).build();
+
+        String lang = (String) ((ImmutableMap<String, Object>) preparedScript.get(ES_SCRIPT_KEY)).get(ES_LANG_KEY);
+
+        try {
+            ESScriptResponse esScriptResponse = client.getStoredScript(storedScriptId);
+
+            if(Boolean.FALSE.equals(esScriptResponse.getFound()) || !Objects.equals(lang, esScriptResponse.getScript().getLang()) ||
+                    !Objects.equals(source, esScriptResponse.getScript().getSource())){
+                client.createStoredScript(storedScriptId, preparedScript);
+            }
+
+        } catch (final IOException e) {
+            throw new PermanentBackendException(e.getMessage(), e);
+        }
+    }
+
+    private void setupMaxOpenScrollContextsIfNeeded(Configuration config) throws PermanentBackendException {
+
+        if(client.getMajorVersion().getValue() > 6){
+
+            boolean setupMaxOpenScrollContexts;
+
+            if(config.has(SETUP_MAX_OPEN_SCROLL_CONTEXTS)){
+                setupMaxOpenScrollContexts = config.get(SETUP_MAX_OPEN_SCROLL_CONTEXTS);
+            } else {
+                setupMaxOpenScrollContexts = SETUP_MAX_OPEN_SCROLL_CONTEXTS.getDefaultValue();
+            }
+
+            if(setupMaxOpenScrollContexts){
+
+                Map<String, Object> settings = ImmutableMap.of("persistent",
+                        ImmutableMap.of(MAX_OPEN_SCROLL_CONTEXT_PARAMETER, Integer.MAX_VALUE));
+
+                try {
+                    client.updateClusterSettings(settings);
+                } catch (final IOException e) {
+                    throw new PermanentBackendException(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * If ES already contains this instance's target index, then do nothing.
+     * Otherwise, create the index, then wait .
+     * <p>
+     * The {@code client} field must point to a live, connected client.
+     * The {@code indexName} field must be non-null and point to the name
+     * of the index to check for existence or create.
+     *
+     * @param index index name
+     * @throws IOException if the index status could not be checked or index could not be created
+     */
+    private void checkForOrCreateIndex(String index) throws IOException {
+        Objects.requireNonNull(client);
+        Objects.requireNonNull(index);
+
+        // Create index if it does not useExternalMappings and if it does not already exist
+        if (!useExternalMappings && !client.indexExists(index)) {
+            client.createIndex(index, indexSetting);
+            client.updateIndexSettings(index, MAX_RESULT_WINDOW);
+            try {
+                log.debug("Sleeping {} ms after {} index creation returned from actionGet()", createSleep, index);
+                Thread.sleep(createSleep);
+            } catch (final InterruptedException e) {
+                throw new JanusGraphException("Interrupted while waiting for index to settle in", e);
+            }
+        }
+        Preconditions.checkState(client.indexExists(index), "Could not create index: %s",index);
+        client.addAlias(indexName, index);
+    }
+
+
+    /**
+     * Configure ElasticSearchIndex's ES client. See{@link org.janusgraph.diskstorage.es.ElasticSearchSetup} for more
+     * information.
+     *
+     * @param config a config passed to ElasticSearchIndex's constructor
+     * @return a client object open and ready for use
+     */
+    private ElasticSearchSetup.Connection interfaceConfiguration(Configuration config) {
+        final ElasticSearchSetup clientMode = ConfigOption.getEnumValue(config.get(INTERFACE), ElasticSearchSetup.class);
+
+        try {
+            return clientMode.connect(config);
+        } catch (final IOException e) {
+            throw new JanusGraphException(e);
+        }
+    }
+
+    private BackendException convert(Exception esException) {
+        if (esException instanceof InterruptedException) {
+            return new TemporaryBackendException("Interrupted while waiting for response", esException);
+        } else {
+            return new PermanentBackendException("Unknown exception while executing index operation", esException);
+        }
+    }
+
+    private static String getDualMappingName(String key) {
+        return key + STRING_MAPPING_SUFFIX;
+    }
+
+    private String generateScriptId(String uniqueScriptSuffix){
+        return indexName + SCRIPT_ID_SEPARATOR + uniqueScriptSuffix;
+    }
+
+    private String generateIndexStoreName(String store){
+        return indexName + INDEX_NAME_SEPARATOR + store.toLowerCase();
+    }
+
+    private String getIndexStoreName(String store) {
+
+        if(indexStoreNameCacheEnabled){
+            return indexStoreNamesCache.computeIfAbsent(store, generateIndexStoreNameFunction);
+        }
+
+        return generateIndexStoreName(store);
+    }
+
+    @Override
+    public void register(String store, String key, KeyInformation information,
+                         BaseTransaction tx) throws BackendException {
+        final Class<?> dataType = information.getDataType();
+        final Mapping map = Mapping.getMapping(information);
+        Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtils.isString(dataType) ||
+                        (map==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType)),
+                "Specified illegal mapping [%s] for data type [%s]",map,dataType);
+        final String indexStoreName = getIndexStoreName(store);
+        if (useExternalMappings) {
+            try {
+                //We check if the externalMapping have the property 'key'
+                final IndexMapping mappings = client.getMapping(indexStoreName, store);
+                if (mappings == null || (!mappings.isDynamic() && !mappings.getProperties().containsKey(key))) {
+                    //Error if it is not dynamic and have not the property 'key'
+                    throw new PermanentBackendException("The external mapping for index '"+ indexStoreName + "' and type '" + store + "' do not have property '" + key + "'");
+                } else if (allowMappingUpdate && mappings.isDynamic()) {
+                    //If it is dynamic, we push the unknown property 'key'
+                    this.pushMapping(store, key, information);
+                }
+            } catch (final IOException e) {
+                throw new PermanentBackendException(e);
+            }
+        } else {
+            try {
+                checkForOrCreateIndex(indexStoreName);
+            } catch (final IOException e) {
+                throw new PermanentBackendException(e);
+            }
+            this.pushMapping(store, key, information);
+        }
+    }
+
+    /**
+     * Push mapping to ElasticSearch
+     * @param store the type in the index
+     * @param key the name of the property in the index
+     * @param information information of the key
+     */
+    private void pushMapping(String store, String key,
+                             KeyInformation information) throws AssertionError, BackendException {
+        final Class<?> dataType = information.getDataType();
+        Mapping map = Mapping.getMapping(information);
+        final Map<String,Object> properties = new HashMap<>();
+        if (AttributeUtils.isString(dataType)) {
+            if (map==Mapping.DEFAULT) map=Mapping.TEXT;
+            log.debug("Registering string type for {} with mapping {}", key, map);
+            final String stringAnalyzer
+                    = ParameterType.STRING_ANALYZER.findParameter(information.getParameters(), null);
+            final String textAnalyzer = ParameterType.TEXT_ANALYZER.findParameter(information.getParameters(), null);
+            // use keyword type for string mappings unless custom string analyzer is provided
+            final Map<String,Object> stringMapping
+                    = stringAnalyzer == null ? compat.createKeywordMapping() : compat.createTextMapping(stringAnalyzer);
+            switch (map) {
+                case STRING:
+                    properties.put(key, stringMapping);
+                    break;
+                case TEXT:
+                    properties.put(key, compat.createTextMapping(textAnalyzer));
+                    break;
+                case TEXTSTRING:
+                    properties.put(key, compat.createTextMapping(textAnalyzer));
+                    properties.put(getDualMappingName(key), stringMapping);
+                    break;
+                default: throw new AssertionError("Unexpected mapping: "+map);
+            }
+        } else if (dataType == Float.class) {
+            log.debug("Registering float type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "float"));
+        } else if (dataType == Double.class) {
+            log.debug("Registering double type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "double"));
+        } else if (dataType == Byte.class) {
+            log.debug("Registering byte type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "byte"));
+        } else if (dataType == Short.class) {
+            log.debug("Registering short type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "short"));
+        } else if (dataType == Integer.class) {
+            log.debug("Registering integer type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "integer"));
+        } else if (dataType == Long.class) {
+            log.debug("Registering long type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "long"));
+        } else if (dataType == Boolean.class) {
+            log.debug("Registering boolean type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "boolean"));
+        } else if (dataType == Geoshape.class) {
+            switch (map) {
+                case PREFIX_TREE:
+                    final int maxLevels = ParameterType.INDEX_GEO_MAX_LEVELS.findParameter(information.getParameters(),
+                            DEFAULT_GEO_MAX_LEVELS);
+                    final double distErrorPct
+                            = ParameterType.INDEX_GEO_DIST_ERROR_PCT.findParameter(information.getParameters(),
+                            DEFAULT_GEO_DIST_ERROR_PCT);
+                    log.debug("Registering geo_shape type for {} with tree_levels={} and distance_error_pct={}", key,
+                            maxLevels, distErrorPct);
+                    properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_shape",
+                            "tree", "quadtree",
+                            "tree_levels", maxLevels,
+                            "distance_error_pct", distErrorPct));
+                    break;
+                default:
+                    log.debug("Registering geo_point type for {}", key);
+                    properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_point"));
+            }
+        } else if (dataType == Date.class || dataType == Instant.class) {
+            log.debug("Registering date type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "date"));
+        } else if (dataType == UUID.class) {
+            log.debug("Registering uuid type for {}", key);
+            properties.put(key, compat.createKeywordMapping());
+        }
+
+        if (useAllField) {
+            // add custom all field mapping if it doesn't exist
+            properties.put(ElasticSearchConstants.CUSTOM_ALL_FIELD, compat.createTextMapping(null));
+
+            // add copy_to for custom all field mapping
+            if (properties.containsKey(key) && dataType != Geoshape.class) {
+                final Map<String, Object> mapping = new HashMap<>(((Map<String, Object>) properties.get(key)));
+                mapping.put("copy_to", ElasticSearchConstants.CUSTOM_ALL_FIELD);
+                properties.put(key, mapping);
+            }
+        }
+
+        final List<Parameter> customParameters = ParameterType.getCustomParameters(information.getParameters());
+
+        if (properties.containsKey(key) && !customParameters.isEmpty()) {
+            final Map<String, Object> mapping = new HashMap<>(((Map<String, Object>) properties.get(key)));
+            customParameters.forEach(p -> mapping.put(p.key(), p.value()));
+            properties.put(key, mapping);
+        }
+
+        final Map<String,Object> mapping = ImmutableMap.of("properties", properties);
+
+        try {
+            client.createMapping(getIndexStoreName(store), store, mapping);
+        } catch (final Exception e) {
+            throw convert(e);
+        }
+    }
+
+    private static Mapping getStringMapping(KeyInformation information) {
+        assert AttributeUtils.isString(information.getDataType());
+        Mapping map = Mapping.getMapping(information);
+        if (map==Mapping.DEFAULT) map = Mapping.TEXT;
+        return map;
+    }
+
+    private static boolean hasDualStringMapping(KeyInformation information) {
+        return AttributeUtils.isString(information.getDataType()) && getStringMapping(information)==Mapping.TEXTSTRING;
+    }
+
+    public Map<String, Object> getNewDocument(final List<IndexEntry> additions,
+                                              KeyInformation.StoreRetriever information) throws BackendException {
+        // JSON writes duplicate fields one after another, which forces us
+        // at this stage to make de-duplication on the IndexEntry list. We don't want to pay the
+        // price map storage on the Mutation level because none of other backends need that.
+
+        final Multimap<String, IndexEntry> unique = LinkedListMultimap.create();
+        for (final IndexEntry e : additions) {
+            unique.put(e.field, e);
+        }
+
+        final Map<String, Object> doc = new HashMap<>();
+        for (final Map.Entry<String, Collection<IndexEntry>> add : unique.asMap().entrySet()) {
+            final KeyInformation keyInformation = information.get(add.getKey());
+            final Object value;
+            switch (keyInformation.getCardinality()) {
+                case SINGLE:
+                    value = convertToEsType(Iterators.getLast(add.getValue().iterator()).value,
+                            Mapping.getMapping(keyInformation));
+                    break;
+                case SET:
+                case LIST:
+                    value = add.getValue().stream()
+                            .map(v -> convertToEsType(v.value, Mapping.getMapping(keyInformation)))
+                            .filter(v -> {
+                                Preconditions.checkArgument(!(v instanceof byte[]),
+                                        "Collections not supported for %s", add.getKey());
+                                return true;
+                            }).toArray();
+                    break;
+                default:
+                    value = null;
+                    break;
+            }
+
+            doc.put(add.getKey(), value);
+            if (hasDualStringMapping(information.get(add.getKey())) && keyInformation.getDataType() == String.class) {
+                doc.put(getDualMappingName(add.getKey()), value);
+            }
+
+
+        }
+
+        return doc;
+    }
+
+    private static Object convertToEsType(Object value, Mapping mapping) {
+        if (value instanceof Number) {
+            if (AttributeUtils.isWholeNumber((Number) value)) {
+                return ((Number) value).longValue();
+            } else { //double or float
+                return ((Number) value).doubleValue();
+            }
+        } else if (AttributeUtils.isString(value)) {
+            return value;
+        } else if (value instanceof Geoshape) {
+            return convertGeoshape((Geoshape) value, mapping);
+        } else if (value instanceof Date) {
+            return value;
+        } else if (value instanceof  Instant) {
+            return Date.from((Instant) value);
+        } else if (value instanceof Boolean) {
+            return value;
+        } else if (value instanceof UUID) {
+            return value.toString();
+        } else throw new IllegalArgumentException("Unsupported type: " + value.getClass() + " (value: " + value + ")");
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Object convertGeoshape(Geoshape geoshape, Mapping mapping) {
+        if (geoshape.getType() == Geoshape.Type.POINT && Mapping.PREFIX_TREE != mapping) {
+            final Geoshape.Point p = geoshape.getPoint();
+            return new double[]{p.getLongitude(), p.getLatitude()};
+        } else if (geoshape.getType() == Geoshape.Type.BOX) {
+            final Rectangle box = geoshape.getShape().getBoundingBox();
+            final Map<String,Object> map = new HashMap<>();
+            map.put("type", "envelope");
+            map.put("coordinates", new double[][] {{box.getMinX(),box.getMaxY()},{box.getMaxX(),box.getMinY()}});
+            return map;
+        } else if (geoshape.getType() == Geoshape.Type.CIRCLE) {
+            try {
+                final Map<String,Object> map = geoshape.toMap();
+                map.put("radius", map.get("radius") + ((Map<String, String>) map.remove("properties")).get("radius_units"));
+                return map;
+            } catch (final IOException e) {
+                throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e);
+            }
+        } else {
+            try {
+                return geoshape.toMap();
+            } catch (final IOException e) {
+                throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e);
+            }
+        }
+    }
+
+    @Override
+    public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever information,
+                       BaseTransaction tx) throws BackendException {
+        final List<ElasticSearchMutation> requests = new ArrayList<>();
+        try {
+            for (final Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) {
+                final List<ElasticSearchMutation> requestByStore = new ArrayList<>();
+                final String storeName = stores.getKey();
+                final String indexStoreName = getIndexStoreName(storeName);
+                for (final Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) {
+                    final String documentId = entry.getKey();
+                    final IndexMutation mutation = entry.getValue();
+                    assert mutation.isConsolidated();
+                    Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted()));
+                    Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions());
+                    Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions());
+                    //Deletions first
+                    if (mutation.hasDeletions()) {
+                        if (mutation.isDeleted()) {
+                            log.trace("Deleting entire document {}", documentId);
+                            requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, storeName,
+                                    documentId));
+                        } else {
+                            List<Map<String, Object>> params = getParameters(information.get(storeName),
+                                    mutation.getDeletions(), true);
+                            Map doc = compat.prepareStoredScript(parameterizedDeletionScriptId, params).build();
+                            log.trace("Deletion script {} with params {}", PARAMETERIZED_DELETION_SCRIPT, params);
+                            requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName,
+                                    documentId, doc));
+                        }
+                    }
+                    if (mutation.hasAdditions()) {
+                        if (mutation.isNew()) { //Index
+                            log.trace("Adding entire document {}", documentId);
+                            final Map<String, Object> source = getNewDocument(mutation.getAdditions(),
+                                    information.get(storeName));
+                            requestByStore.add(ElasticSearchMutation.createIndexRequest(indexStoreName, storeName,
+                                    documentId, source));
+                        } else {
+                            final Map upsert;
+                            if (!mutation.hasDeletions()) {
+                                upsert = getNewDocument(mutation.getAdditions(), information.get(storeName));
+                            } else {
+                                upsert = null;
+                            }
+
+                            List<Map<String, Object>> params = getParameters(information.get(storeName),
+                                    mutation.getAdditions(), false, Cardinality.SINGLE);
+                            if (!params.isEmpty()) {
+                                ImmutableMap.Builder builder = compat.prepareStoredScript(parameterizedAdditionScriptId, params);
+                                requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName,
+                                        documentId, builder, upsert));
+                                log.trace("Adding script {} with params {}", PARAMETERIZED_ADDITION_SCRIPT, params);
+                            }
+
+                            final Map<String, Object> doc = getAdditionDoc(information, storeName, mutation);
+                            if (!doc.isEmpty()) {
+                                final ImmutableMap.Builder builder = ImmutableMap.builder().put(ES_DOC_KEY, doc);
+                                requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName,
+                                        documentId, builder, upsert));
+                                log.trace("Adding update {}", doc);
+                            }
+                        }
+                    }
+                }
+                if (!requestByStore.isEmpty() && ingestPipelines.containsKey(storeName)) {
+                    client.bulkRequest(requestByStore, String.valueOf(ingestPipelines.get(storeName)));
+                } else if (!requestByStore.isEmpty()) {
+                    requests.addAll(requestByStore);
+                }
+            }
+            if (!requests.isEmpty()) {
+                client.bulkRequest(requests, null);
+            }
+        } catch (final Exception e) {
+            log.error("Failed to execute bulk Elasticsearch mutation", e);
+            throw convert(e);
+        }
+    }
+
+    private List<Map<String, Object>> getParameters(KeyInformation.StoreRetriever storeRetriever,
+                                                    List<IndexEntry> entries,
+                                                    boolean deletion,
+                                                    Cardinality... cardinalitiesToSkip) {
+        Set<Cardinality> cardinalityToSkipSet = Sets.newHashSet(cardinalitiesToSkip);
+        List<Map<String, Object>> result = new ArrayList<>();
+        for (IndexEntry entry : entries) {
+            KeyInformation info = storeRetriever.get(entry.field);
+            if (cardinalityToSkipSet.contains(info.getCardinality())) {
+                continue;
+            }
+            Object jsValue = deletion && info.getCardinality() == Cardinality.SINGLE ?
+                    "" : convertToEsType(entry.value, Mapping.getMapping(info));
+            result.add(ImmutableMap.of("name", entry.field,
+                    "value", jsValue,
+                    "cardinality", info.getCardinality().name()));
+            if (hasDualStringMapping(info)) {
+                result.add(ImmutableMap.of("name", getDualMappingName(entry.field),
+                        "value", jsValue,
+                        "cardinality", info.getCardinality().name()));
+            }
+        }
+        return result;
+    }
+
+    private Map<String,Object> getAdditionDoc(KeyInformation.IndexRetriever information,
+                                              String store, IndexMutation mutation) throws PermanentBackendException {
+        final Map<String,Object> doc = new HashMap<>();
+        for (final IndexEntry e : mutation.getAdditions()) {
+            final KeyInformation keyInformation = information.get(store).get(e.field);
+            if (keyInformation.getCardinality() == Cardinality.SINGLE) {
+                doc.put(e.field, convertToEsType(e.value, Mapping.getMapping(keyInformation)));
+                if (hasDualStringMapping(keyInformation)) {
+                    doc.put(getDualMappingName(e.field), convertToEsType(e.value, Mapping.getMapping(keyInformation)));
+                }
+            }
+        }
+
+        return doc;
+    }
+
+    @Override
+    public void restore(Map<String,Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever information,
+                        BaseTransaction tx) throws BackendException {
+        final List<ElasticSearchMutation> requests = new ArrayList<>();
+        try {
+            for (final Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) {
+                final List<ElasticSearchMutation> requestByStore = new ArrayList<>();
+                final String store = stores.getKey();
+                final String indexStoreName = getIndexStoreName(store);
+                for (final Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) {
+                    final String docID = entry.getKey();
+                    final List<IndexEntry> content = entry.getValue();
+                    if (content == null || content.size() == 0) {
+                        // delete
+                        if (log.isTraceEnabled())
+                            log.trace("Deleting entire document {}", docID);
+
+                        requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, store, docID));
+                    } else {
+                        // Add
+                        if (log.isTraceEnabled())
+                            log.trace("Adding entire document {}", docID);
+                        final Map<String, Object> source = getNewDocument(content, information.get(store));
+                        requestByStore.add(ElasticSearchMutation.createIndexRequest(indexStoreName, store, docID,
+                                source));
+                    }
+                }
+                if (!requestByStore.isEmpty() && ingestPipelines.containsKey(store)) {
+                    client.bulkRequest(requestByStore, String.valueOf(ingestPipelines.get(store)));
+                } else if (!requestByStore.isEmpty()) {
+                    requests.addAll(requestByStore);
+                }
+            }
+            if (!requests.isEmpty())
+                client.bulkRequest(requests, null);
+        } catch (final Exception e) {
+            throw convert(e);
+        }
+    }
+
+    private Map<String, Object> getRelationFromCmp(final Cmp cmp, String key, final Object value) {
+        switch (cmp) {
+            case EQUAL:
+                return compat.term(key, value);
+            case NOT_EQUAL:
+                return compat.boolMustNot(compat.term(key, value));
+            case LESS_THAN:
+                return compat.lt(key, value);
+            case LESS_THAN_EQUAL:
+                return compat.lte(key, value);
+            case GREATER_THAN:
+                return compat.gt(key, value);
+            case GREATER_THAN_EQUAL:
+                return compat.gte(key, value);
+            default:
+                throw new IllegalArgumentException("Unexpected relation: " + cmp);
+        }
+    }
+
+    public Map<String,Object> getFilter(Condition<?> condition, KeyInformation.StoreRetriever information) {
+        if (condition instanceof PredicateCondition) {
+            final PredicateCondition<String, ?> atom = (PredicateCondition) condition;
+            Object value = atom.getValue();
+            final String key = atom.getKey();
+            final JanusGraphPredicate predicate = atom.getPredicate();
+            if (value == null && predicate == Cmp.NOT_EQUAL) {
+                return compat.exists(key);
+            } else if (value instanceof Number) {
+                Preconditions.checkArgument(predicate instanceof Cmp,
+                        "Relation not supported on numeric types: %s", predicate);
+                return getRelationFromCmp((Cmp) predicate, key, value);
+            } else if (value instanceof String) {
+
+                final Mapping mapping = getStringMapping(information.get(key));
+                final String fieldName;
+                if (mapping==Mapping.TEXT && !(Text.HAS_CONTAINS.contains(predicate) || predicate instanceof Cmp))
+                    throw new IllegalArgumentException("Text mapped string values only support CONTAINS and Compare queries and not: " + predicate);
+                if (mapping==Mapping.STRING && Text.HAS_CONTAINS.contains(predicate))
+                    throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + predicate);
+                if (mapping==Mapping.TEXTSTRING && !(Text.HAS_CONTAINS.contains(predicate) || (predicate instanceof Cmp && predicate != Cmp.EQUAL))) {
+                    fieldName = getDualMappingName(key);
+                } else {
+                    fieldName = key;
+                }
+
+                if (predicate == Text.CONTAINS || predicate == Cmp.EQUAL) {
+                    return compat.match(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.match(fieldName, value))));
+                } else if (predicate == Text.CONTAINS_PHRASE) {
+                    return compat.matchPhrase(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS_PHRASE) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.matchPhrase(fieldName, value))));
+                } else if (predicate == Text.CONTAINS_PREFIX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.prefix(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS_PREFIX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.prefix(fieldName, value))));
+                } else if (predicate == Text.CONTAINS_REGEX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.regexp(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS_REGEX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.regexp(fieldName, value))));
+                } else if (predicate == Text.PREFIX) {
+                    return compat.prefix(fieldName, value);
+                } else if (predicate == Text.NOT_PREFIX) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.prefix(fieldName, value))));
+                } else if (predicate == Text.REGEX) {
+                    return compat.regexp(fieldName, value);
+                } else if (predicate == Text.NOT_REGEX) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.regexp(fieldName, value))));
+                } else if (predicate == Cmp.NOT_EQUAL) {
+                    return compat.boolMustNot(compat.match(fieldName, value));
+                } else if (predicate == Text.FUZZY || predicate == Text.CONTAINS_FUZZY) {
+                    return compat.fuzzyMatch(fieldName, value);
+                } else if (predicate == Text.NOT_FUZZY || predicate == Text.NOT_CONTAINS_FUZZY) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.fuzzyMatch(fieldName, value))));
+                } else if (predicate == Cmp.LESS_THAN) {
+                    return compat.lt(fieldName, value);
+                } else if (predicate == Cmp.LESS_THAN_EQUAL) {
+                    return compat.lte(fieldName, value);
+                } else if (predicate == Cmp.GREATER_THAN) {
+                    return compat.gt(fieldName, value);
+                } else if (predicate == Cmp.GREATER_THAN_EQUAL) {
+                    return compat.gte(fieldName, value);
+                } else
+                    throw new IllegalArgumentException("Predicate is not supported for string value: " + predicate);
+            } else if (value instanceof Geoshape && Mapping.getMapping(information.get(key)) == Mapping.DEFAULT) {
+                // geopoint
+                final Geoshape shape = (Geoshape) value;
+                Preconditions.checkArgument(predicate instanceof Geo && predicate != Geo.CONTAINS,
+                        "Relation not supported on geopoint types: %s", predicate);
+
+                final Map<String,Object> query;
+                switch (shape.getType()) {
+                    case CIRCLE:
+                        final Geoshape.Point center = shape.getPoint();
+                        query = compat.geoDistance(key, center.getLatitude(), center.getLongitude(), shape.getRadius());
+                        break;
+                    case BOX:
+                        final Geoshape.Point southwest = shape.getPoint(0);
+                        final Geoshape.Point northeast = shape.getPoint(1);
+                        query = compat.geoBoundingBox(key, southwest.getLatitude(), southwest.getLongitude(),
+                                northeast.getLatitude(), northeast.getLongitude());
+                        break;
+                    case POLYGON:
+                        final List<List<Double>> points = IntStream.range(0, shape.size())
+                                .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(),
+                                        shape.getPoint(i).getLatitude()))
+                                .collect(Collectors.toList());
+                        query = compat.geoPolygon(key, points);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unsupported or invalid search shape type for geopoint: "
+                                + shape.getType());
+                }
+
+                return predicate == Geo.DISJOINT ?  compat.boolMustNot(query) : query;
+            } else if (value instanceof Geoshape) {
+                Preconditions.checkArgument(predicate instanceof Geo,
+                        "Relation not supported on geoshape types: %s", predicate);
+                final Geoshape shape = (Geoshape) value;
+                final Map<String,Object> geo;
+                switch (shape.getType()) {
+                    case CIRCLE:
+                        final Geoshape.Point center = shape.getPoint();
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "circle",
+                                ES_GEO_COORDS_KEY, ImmutableList.of(center.getLongitude(), center.getLatitude()),
+                                "radius", shape.getRadius() + "km");
+                        break;
+                    case BOX:
+                        final Geoshape.Point southwest = shape.getPoint(0);
+                        final Geoshape.Point northeast = shape.getPoint(1);
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "envelope",
+                                ES_GEO_COORDS_KEY,
+                                ImmutableList.of(
+                                        ImmutableList.of(southwest.getLongitude(), northeast.getLatitude()),
+                                        ImmutableList.of(northeast.getLongitude(), southwest.getLatitude())));
+                        break;
+                    case LINE:
+                        final List lineCoords = IntStream.range(0, shape.size())
+                                .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(),
+                                        shape.getPoint(i).getLatitude()))
+                                .collect(Collectors.toList());
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "linestring", ES_GEO_COORDS_KEY, lineCoords);
+                        break;
+                    case POLYGON:
+                        final List polyCoords = IntStream.range(0, shape.size())
+                                .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(),
+                                        shape.getPoint(i).getLatitude()))
+                                .collect(Collectors.toList());
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "polygon", ES_GEO_COORDS_KEY,
+                                ImmutableList.of(polyCoords));
+                        break;
+                    case POINT:
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "point",
+                                ES_GEO_COORDS_KEY, ImmutableList.of(shape.getPoint().getLongitude(),
+                                        shape.getPoint().getLatitude()));
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unsupported or invalid search shape type: "
+                                + shape.getType());
+                }
+
+                return compat.geoShape(key, geo, (Geo) predicate);
+            } else if (value instanceof Date || value instanceof Instant) {
+                Preconditions.checkArgument(predicate instanceof Cmp,
+                        "Relation not supported on date types: %s", predicate);
+
+                if (value instanceof Instant) {
+                    value = Date.from((Instant) value);
+                }
+                return getRelationFromCmp((Cmp) predicate, key, value);
+            } else if (value instanceof Boolean) {
+                final Cmp numRel = (Cmp) predicate;
+                switch (numRel) {
+                    case EQUAL:
+                        return compat.term(key, value);
+                    case NOT_EQUAL:
+                        return compat.boolMustNot(compat.term(key, value));
+                    default:
+                        throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL");
+                }
+            } else if (value instanceof UUID) {
+                if (predicate == Cmp.EQUAL) {
+                    return compat.term(key, value);
+                } else if (predicate == Cmp.NOT_EQUAL) {
+                    return compat.boolMustNot(compat.term(key, value));
+                } else {
+                    throw new IllegalArgumentException("Only equal or not equal is supported for UUIDs: "
+                            + predicate);
+                }
+            } else throw new IllegalArgumentException("Unsupported type: " + value);
+        } else if (condition instanceof Not) {
+            return compat.boolMustNot(getFilter(((Not) condition).getChild(),information));
+        } else if (condition instanceof And) {
+            final List queries = StreamSupport.stream(condition.getChildren().spliterator(), false)
+                    .map(c -> getFilter(c,information)).collect(Collectors.toList());
+            return compat.boolMust(queries);
+        } else if (condition instanceof Or) {
+            final List queries = StreamSupport.stream(condition.getChildren().spliterator(), false)
+                    .map(c -> getFilter(c,information)).collect(Collectors.toList());
+            return compat.boolShould(queries);
+        } else throw new IllegalArgumentException("Invalid condition: " + condition);
+    }
+
+    @Override
+    public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever informations,
+                                BaseTransaction tx) throws BackendException {
+        final ElasticSearchRequest sr = new ElasticSearchRequest();
+        final Map<String,Object> esQuery = getFilter(query.getCondition(), informations.get(query.getStore()));
+        sr.setQuery(compat.prepareQuery(esQuery));
+        if (!query.getOrder().isEmpty()) {
+            addOrderToQuery(informations, sr, query.getOrder(), query.getStore());
+        }
+        sr.setFrom(0);
+        if (query.hasLimit()) {
+            sr.setSize(Math.min(query.getLimit(), batchSize));
+        } else {
+            sr.setSize(batchSize);
+        }
+
+        sr.setDisableSourceRetrieval(true);
+
+        ElasticSearchResponse response;
+        try {
+            final String indexStoreName = getIndexStoreName(query.getStore());
+            final boolean useScroll = sr.getSize() >= batchSize;
+            response = client.search(indexStoreName,
+                    compat.createRequestBody(sr, useScroll? NULL_PARAMETERS : TRACK_TOTAL_HITS_DISABLED_PARAMETERS),
+                    useScroll);
+            log.debug("First Executed query [{}] in {} ms", query.getCondition(), response.getTook());
+            final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, sr.getSize());
+            final Stream<RawQuery.Result<String>> toReturn
+                    = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false);
+            return (query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn).map(RawQuery.Result::getResult);
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private Iterator<RawQuery.Result<String>> getResultsIterator(boolean useScroll, ElasticSearchResponse response, int windowSize){
+        return (useScroll)? new ElasticSearchScroll(client, response, windowSize) : response.getResults().iterator();
+    }
+
+    private String convertToEsDataType(Class<?> dataType, Mapping mapping) {
+        if(String.class.isAssignableFrom(dataType)) {
+            return "string";
+        }
+        else if (Integer.class.isAssignableFrom(dataType)) {
+            return "integer";
+        }
+        else if (Long.class.isAssignableFrom(dataType)) {
+            return "long";
+        }
+        else if (Float.class.isAssignableFrom(dataType)) {
+            return "float";
+        }
+        else if (Double.class.isAssignableFrom(dataType)) {
+            return "double";
+        }
+        else if (Boolean.class.isAssignableFrom(dataType)) {
+            return "boolean";
+        }
+        else if (Date.class.isAssignableFrom(dataType)) {
+            return "date";
+        }
+        else if (Instant.class.isAssignableFrom(dataType)) {
+            return "date";
+        }
+        else if (Geoshape.class.isAssignableFrom(dataType)) {
+            return mapping == Mapping.DEFAULT ? "geo_point" : "geo_shape";
+        }
+
+        return null;
+    }
+
+    private ElasticSearchResponse runCommonQuery(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx, int size,
+                                                 boolean useScroll) throws BackendException{
+        final ElasticSearchRequest sr = new ElasticSearchRequest();
+        sr.setQuery(compat.queryString(query.getQuery()));
+        if (!query.getOrders().isEmpty()) {
+            addOrderToQuery(informations, sr, query.getOrders(), query.getStore());
+        }
+        sr.setFrom(0);
+        sr.setSize(size);
+        sr.setDisableSourceRetrieval(true);
+        try {
+            Map<String, Object> requestBody = compat.createRequestBody(sr, query.getParameters());
+            if(!useScroll) {
+                if (requestBody == null) {
+                    requestBody = TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY;
+                } else {
+                    requestBody.put(TRACK_TOTAL_HITS_PARAMETER, false);
+                }
+            }
+            return client.search(
+                    getIndexStoreName(query.getStore()),
+                    requestBody,
+                    useScroll);
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private long runCountQuery(RawQuery query) throws BackendException{
+        try {
+            return client.countTotal(
+                    getIndexStoreName(query.getStore()),
+                    compat.createRequestBody(compat.queryString(query.getQuery()), query.getParameters()));
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private void addOrderToQuery(KeyInformation.IndexRetriever informations, ElasticSearchRequest sr, final List<IndexQuery.OrderEntry> orders,
+                                 String store) {
+        for (final IndexQuery.OrderEntry orderEntry : orders) {
+            final String order = orderEntry.getOrder().name();
+            final KeyInformation information = informations.get(store).get(orderEntry.getKey());
+            final Mapping mapping = Mapping.getMapping(information);
+            final Class<?> datatype = orderEntry.getDatatype();
+            final String key = hasDualStringMapping(information) ? getDualMappingName(orderEntry.getKey()) : orderEntry.getKey();
+            sr.addSort(key, order.toLowerCase(), convertToEsDataType(datatype, mapping));
+        }
+    }
+
+    @Override
+    public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever information,
+                                                 BaseTransaction tx) throws BackendException {
+        final int size = query.hasLimit() ? Math.min(query.getLimit() + query.getOffset(), batchSize) : batchSize;
+        final boolean useScroll = size >= batchSize;
+        final ElasticSearchResponse response = runCommonQuery(query, information, tx, size, useScroll);
+        log.debug("First Executed query [{}] in {} ms", query.getQuery(), response.getTook());
+        final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, size);
+        final Stream<RawQuery.Result<String>> toReturn
+                = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED),
+                false).skip(query.getOffset());
+        return query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn;
+    }
+
+    @Override
+    public Long queryCount(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
+        final ElasticSearchRequest sr = new ElasticSearchRequest();
+        final Map<String,Object> esQuery = getFilter(query.getCondition(), information.get(query.getStore()));
+        sr.setQuery(compat.prepareQuery(esQuery));
+        try {
+            return client.countTotal(
+                    getIndexStoreName(query.getStore()),
+                    compat.createRequestBody(sr, null));
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    @Override
+    public Long totals(RawQuery query, KeyInformation.IndexRetriever information,
+                       BaseTransaction tx) throws BackendException {
+        long startTime = System.currentTimeMillis();
+        long count = runCountQuery(query);
+        if(log.isDebugEnabled()){
+            log.debug("Executed count query [{}] in {} ms", query.getQuery(), System.currentTimeMillis() - startTime);
+        }
+        return count;
+    }
+
+    @Override
+    public boolean supports(KeyInformation information, JanusGraphPredicate janusgraphPredicate) {
+        final Class<?> dataType = information.getDataType();
+        final Mapping mapping = Mapping.getMapping(information);
+        if (mapping!=Mapping.DEFAULT && !AttributeUtils.isString(dataType) &&
+                !(mapping==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType))) return false;
+
+        if (Number.class.isAssignableFrom(dataType)) {
+            return janusgraphPredicate instanceof Cmp;
+        } else if (dataType == Geoshape.class) {
+            switch(mapping) {
+                case DEFAULT:
+                    return janusgraphPredicate instanceof Geo && janusgraphPredicate != Geo.CONTAINS;
+                case PREFIX_TREE:
+                    return janusgraphPredicate instanceof Geo;
+            }
+        } else if (AttributeUtils.isString(dataType)) {
+            switch(mapping) {
+                case DEFAULT:
+                case TEXT:
+                    return janusgraphPredicate == Text.CONTAINS || janusgraphPredicate == Text.NOT_CONTAINS
+                            || janusgraphPredicate == Text.CONTAINS_FUZZY || janusgraphPredicate == Text.NOT_CONTAINS_FUZZY
+                            || janusgraphPredicate == Text.CONTAINS_PREFIX || janusgraphPredicate == Text.NOT_CONTAINS_PREFIX
+                            || janusgraphPredicate == Text.CONTAINS_REGEX || janusgraphPredicate == Text.NOT_CONTAINS_REGEX
+                            || janusgraphPredicate == Text.CONTAINS_PHRASE || janusgraphPredicate == Text.NOT_CONTAINS_PHRASE;
+                case STRING:
+                    return janusgraphPredicate instanceof Cmp
+                            || janusgraphPredicate==Text.REGEX || janusgraphPredicate==Text.NOT_REGEX
+                            || janusgraphPredicate==Text.PREFIX || janusgraphPredicate==Text.NOT_PREFIX
+                            || janusgraphPredicate == Text.FUZZY || janusgraphPredicate == Text.NOT_FUZZY;
+                case TEXTSTRING:
+                    return janusgraphPredicate instanceof Text || janusgraphPredicate instanceof Cmp;
+            }
+        } else if (dataType == Date.class || dataType == Instant.class) {
+            return janusgraphPredicate instanceof Cmp;
+        } else if (dataType == Boolean.class) {
+            return janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate == Cmp.NOT_EQUAL;
+        } else if (dataType == UUID.class) {
+            return janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate==Cmp.NOT_EQUAL;
+        }
+        return false;
+    }
+
+
+    @Override
+    public boolean supports(KeyInformation information) {
+        final Class<?> dataType = information.getDataType();
+        final Mapping mapping = Mapping.getMapping(information);
+        if (Number.class.isAssignableFrom(dataType) || dataType == Date.class || dataType== Instant.class
+                || dataType == Boolean.class || dataType == UUID.class) {
+            return mapping == Mapping.DEFAULT;
+        } else if (AttributeUtils.isString(dataType)) {
+            return mapping == Mapping.DEFAULT || mapping == Mapping.STRING
+                    || mapping == Mapping.TEXT || mapping == Mapping.TEXTSTRING;
+        } else if (AttributeUtils.isGeo(dataType)) {
+            return mapping == Mapping.DEFAULT || mapping == Mapping.PREFIX_TREE;
+        }
+        return false;
+    }
+
+    @Override
+    public String mapKey2Field(String key, KeyInformation information) {
+        IndexProvider.checkKeyValidity(key);
+        return key.replace(' ', IndexProvider.REPLACEMENT_CHAR);
+    }
+
+    @Override
+    public IndexFeatures getFeatures() {
+        return compat.getIndexFeatures();
+    }
+
+    @Override
+    public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException {
+        return new DefaultTransaction(config);
+    }
+
+    @Override
+    public void close() throws BackendException {
+        try {
+            client.close();
+        } catch (final IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+    }
+
+    @Override
+    public void clearStorage() throws BackendException {
+        try {
+            client.deleteIndex(indexName);
+        } catch (final Exception e) {
+            throw new PermanentBackendException("Could not delete index " + indexName, e);
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public boolean exists() throws BackendException {
+        try {
+            return client.indexExists(indexName);
+        } catch (final IOException e) {
+            throw new PermanentBackendException("Could not check if index " + indexName + " exists", e);
+        }
+    }
+
+    ElasticMajorVersion getVersion() {
+        return client.getMajorVersion();
+    }
+
+    boolean isUseMappingForES7(){
+        return useMappingForES7;
+    }
+
+    private static String parameterizedScriptPrepare(String ... lines){
+        return Arrays.stream(lines).map(String::trim).collect(Collectors.joining(""));
+    }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 4a37c77..f83dff6 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -50,14 +50,17 @@ public final class ApplicationProperties extends PropertiesConfiguration {
     public static final String  INDEX_BACKEND_CONF              = "atlas.graph.index.search.backend";
     public static final String  INDEX_MAP_NAME_CONF             = "atlas.graph.index.search.map-name";
     public static final String  SOLR_WAIT_SEARCHER_CONF         = "atlas.graph.index.search.solr.wait-searcher";
+    public static final String  ELASTICSEARCH_INDEX_NAME_CONF   = "atlas.graph.index.search.elasticsearch.index-name";
     public static final String  INDEX_RECOVERY_CONF             = "atlas.index.recovery.enable";
     public static final String  ENABLE_FULLTEXT_SEARCH_CONF     = "atlas.search.fulltext.enable";
     public static final String  ENABLE_FREETEXT_SEARCH_CONF     = "atlas.search.freetext.enable";
     public static final String  ATLAS_RUN_MODE                  = "atlas.run.mode";
     public static final String  GRAPHBD_BACKEND_JANUS           = "janus";
+    public static final String  DEFAULT_INDEX_NAME              = "janusgraph";
     public static final String  STORAGE_BACKEND_HBASE           = "hbase";
     public static final String  STORAGE_BACKEND_HBASE2          = "hbase2";
     public static final String  INDEX_BACKEND_SOLR              = "solr";
+    public static final String  INDEX_BACKEND_ELASTICSEARCH     = "elasticsearch";
     public static final String  LDAP_TYPE                       =  "atlas.authentication.method.ldap.type";
     public static final String  LDAP                            =  "LDAP";
     public static final String  AD                              =  "AD";
@@ -355,6 +358,10 @@ public final class ApplicationProperties extends PropertiesConfiguration {
                 addPropertyDirect(INDEX_MAP_NAME_CONF, DEFAULT_INDEX_MAP_NAME);
                 LOG.info("Setting index.search.map-name property '" + DEFAULT_INDEX_MAP_NAME + "'");
             }
+        } else if (indexBackend.equalsIgnoreCase(INDEX_BACKEND_ELASTICSEARCH)){
+           addPropertyDirect(ELASTICSEARCH_INDEX_NAME_CONF, DEFAULT_INDEX_NAME);
+
+           LOG.info("Setting elasticsearch.index-name property '" + DEFAULT_INDEX_NAME + "'");
         }
 
         // setting value for 'atlas.graph.index.search.max-result-set-size' (default = 500000)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index b316354..57964d5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -166,13 +166,13 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
 
             while (shouldRun.get()) {
                 try {
-                    boolean solrHealthy = isSolrHealthy();
+                    boolean isIdxHealthy = isIndexBackendHealthy();
 
-                    if (this.txRecoveryObject == null && solrHealthy) {
+                    if (this.txRecoveryObject == null && isIdxHealthy) {
                         startMonitoring();
                     }
 
-                    if (this.txRecoveryObject != null && !solrHealthy) {
+                    if (this.txRecoveryObject != null && !isIdxHealthy) {
                         stopMonitoring();
                     }
                 } catch (Exception e) {
@@ -197,7 +197,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
             }
         }
 
-        private boolean isSolrHealthy() throws AtlasException, InterruptedException {
+        private boolean isIndexBackendHealthy() throws AtlasException, InterruptedException {
             Thread.sleep(indexStatusCheckRetryMillis);
 
             return this.graph.getGraphIndexClient().isHealthy();