You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@atlas.apache.org by ma...@apache.org on 2022/02/16 19:10:35 UTC

[atlas] branch branch-2.0 updated (4eb4462 -> 57880d3)

This is an automated email from the ASF dual-hosted git repository.

madhan pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git.


    from 4eb4462  ATLAS-4552: added network name for all Atlas docker images
     new b9aebb2  Updated committer info for Radhika Kundam
     new 57880d3  ATLAS-4554: updated monitoring to support elasticsearch index backend

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../graphdb/janus/AtlasJanusGraphDatabase.java     |   24 +
 .../graphdb/janus/AtlasJanusGraphIndexClient.java  |   27 +-
 .../diskstorage/es/ElasticSearch7Index.java        | 1299 ++++++++++++++++++++
 .../org/apache/atlas/ApplicationProperties.java    |    7 +
 pom.xml                                            |   11 +
 .../repository/graph/IndexRecoveryService.java     |    8 +-
 6 files changed, 1366 insertions(+), 10 deletions(-)
 create mode 100644 graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java

[atlas] 01/02: Updated committer info for Radhika Kundam

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit b9aebb24dd45530ad4ba450ad97ad415cf2130e6
Author: Radhika Kundam <rk...@cloudera.com>
AuthorDate: Tue Feb 8 10:41:53 2022 -0800

    Updated committer info for Radhika Kundam
    
    (cherry picked from commit 7290ffbe9a2390464dc45fed633f15b051a9d64f)
---
 pom.xml | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/pom.xml b/pom.xml
index 3afd687..9aa9da7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -450,6 +450,17 @@
             <organization>Freestone Infotech</organization>
         </developer>
         <developer>
+            <id>radhikakundam</id>
+            <name>Radhika Kundam</name>
+            <email>radhikakundam@apache.org</email>
+            <timezone>America/Los_Angeles</timezone>
+            <roles>
+                <role>committer</role>
+            </roles>
+            <organization>Cloudera Inc.</organization>
+            <!--<organizationUrl>https://www.cloudera.com</organizationUrl>-->
+        </developer>
+        <developer>
             <id>rmani</id>
             <name>Ramesh Mani</name>
             <email>rmani@apache.org</email>

[atlas] 02/02: ATLAS-4554: updated monitoring to support elasticsearch index backend

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit 57880d33428ba516148f41addb9e9cd7a9ec36e5
Author: xingyu_zha <xi...@intsig.net>
AuthorDate: Mon Feb 14 17:56:13 2022 +0800

    ATLAS-4554: updated monitoring to support elasticsearch index backend
    
    Signed-off-by: Madhan Neethiraj <ma...@apache.org>
    (cherry picked from commit 0847d6c371a4482924ff642b525c633d7e0f5e6a)
---
 .../graphdb/janus/AtlasJanusGraphDatabase.java     |   24 +
 .../graphdb/janus/AtlasJanusGraphIndexClient.java  |   27 +-
 .../diskstorage/es/ElasticSearch7Index.java        | 1299 ++++++++++++++++++++
 .../org/apache/atlas/ApplicationProperties.java    |    7 +
 .../repository/graph/IndexRecoveryService.java     |    8 +-
 5 files changed, 1355 insertions(+), 10 deletions(-)

diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 3995cf9..cb3e8d9 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -37,6 +37,7 @@ import org.janusgraph.core.JanusGraphFactory;
 import org.janusgraph.core.schema.JanusGraphManagement;
 import org.janusgraph.diskstorage.StandardIndexProvider;
 import org.janusgraph.diskstorage.StandardStoreManager;
+import org.janusgraph.diskstorage.es.ElasticSearch7Index;
 import org.janusgraph.diskstorage.solr.Solr6Index;
 import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
 import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
@@ -122,6 +123,8 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
         addHBase2Support();
 
         addSolr6Index();
+
+        addElasticSearch7Index();
     }
 
     private static void addHBase2Support() {
@@ -164,6 +167,27 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex,
         }
     }
 
+    private static void addElasticSearch7Index() {
+        try {
+            Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
+            field.setAccessible(true);
+
+            Field modifiersField = Field.class.getDeclaredField("modifiers");
+            modifiersField.setAccessible(true);
+            modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+
+            Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses());
+            customMap.put("elasticsearch", ElasticSearch7Index.class.getName());
+            ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
+            field.set(null, immap);
+
+            LOG.debug("Injected es7 index - {}", ElasticSearch7Index.class.getName());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
     public static JanusGraph getGraphInstance() {
         if (graphInstance == null) {
             synchronized (AtlasJanusGraphDatabase.class) {
diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index 9e9fdd8..f9f3772 100644
--- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -18,6 +18,7 @@
 package org.apache.atlas.repository.graphdb.janus;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.model.discovery.AtlasAggregationEntry;
 import org.apache.atlas.repository.Constants;
@@ -42,6 +43,8 @@ import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.client.solrj.response.TermsResponse;
 import org.apache.solr.common.params.CommonParams;
 import org.apache.solr.common.util.NamedList;
+import org.janusgraph.diskstorage.es.ElasticSearch7Index;
+import org.janusgraph.diskstorage.es.ElasticSearchClient;
 import org.janusgraph.diskstorage.solr.Solr6Index;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,7 +73,7 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
     private static final String         TERMS_FIELD                  = "terms.fl";
     private static final int            SOLR_HEALTHY_STATUS          = 0;
     private static final long           SOLR_STATUS_LOG_FREQUENCY_MS = 60000;//Prints SOLR DOWN status for every 1 min
-    private static long                 prevSolrHealthCheckTime;
+    private static       long           prevIdxHealthCheckTime;
 
 
     private final Configuration configuration;
@@ -83,21 +86,26 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
     public boolean isHealthy() {
         boolean isHealthy   = false;
         long    currentTime = System.currentTimeMillis();
+        String  idxBackEnd  = configuration.getString(ApplicationProperties.INDEX_BACKEND_CONF);
 
         try {
-            if (isSolrHealthy()) {
-                isHealthy = true;
+            if (ApplicationProperties.INDEX_BACKEND_SOLR.equals(idxBackEnd)) {
+                isHealthy = isSolrHealthy();
+            } else if (ApplicationProperties.INDEX_BACKEND_ELASTICSEARCH.equals(idxBackEnd)) {
+                isHealthy = isElasticsearchHealthy();
             }
+
+            LOG.info("indexBackEnd={}; isHealthy={}", idxBackEnd, isHealthy);
         } catch (Exception exception) {
             if (LOG.isDebugEnabled()) {
                 LOG.error("Error: isHealthy", exception);
             }
         }
 
-        if (!isHealthy && (prevSolrHealthCheckTime == 0 || currentTime - prevSolrHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
-            LOG.info("Solr Health: Unhealthy!");
+        if (!isHealthy && (prevIdxHealthCheckTime == 0 || currentTime - prevIdxHealthCheckTime > SOLR_STATUS_LOG_FREQUENCY_MS)) {
+            LOG.info("Backend Health: Unhealthy!");
 
-            prevSolrHealthCheckTime = currentTime;
+            prevIdxHealthCheckTime = currentTime;
         }
 
         return isHealthy;
@@ -379,6 +387,13 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient {
         return client != null && client.ping(Constants.VERTEX_INDEX).getStatus() == SOLR_HEALTHY_STATUS;
     }
 
+    private boolean isElasticsearchHealthy() throws IOException {
+        ElasticSearchClient client           = ElasticSearch7Index.getElasticSearchClient();
+        String              janusVertexIndex = ApplicationProperties.DEFAULT_INDEX_NAME + "_" + Constants.VERTEX_INDEX;
+
+        return client != null && client.indexExists(janusVertexIndex);
+    }
+
     private void graphManagementCommit(AtlasGraphManagement management) {
         try {
             management.commit();
diff --git a/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
new file mode 100644
index 0000000..51a87f5
--- /dev/null
+++ b/graphdb/janus/src/main/java/org/janusgraph/diskstorage/es/ElasticSearch7Index.java
@@ -0,0 +1,1299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.janusgraph.diskstorage.es;
+
+import static org.janusgraph.diskstorage.es.ElasticSearchIndex.*;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import org.janusgraph.core.Cardinality;
+import org.janusgraph.core.JanusGraphException;
+import org.janusgraph.core.attribute.Cmp;
+import org.janusgraph.core.attribute.Geo;
+import org.janusgraph.core.attribute.Geoshape;
+import org.janusgraph.core.attribute.Text;
+import org.janusgraph.core.schema.Mapping;
+import org.janusgraph.core.schema.Parameter;
+import org.janusgraph.diskstorage.BackendException;
+import org.janusgraph.diskstorage.BaseTransaction;
+import org.janusgraph.diskstorage.BaseTransactionConfig;
+import org.janusgraph.diskstorage.BaseTransactionConfigurable;
+import org.janusgraph.diskstorage.PermanentBackendException;
+import org.janusgraph.diskstorage.TemporaryBackendException;
+import org.janusgraph.diskstorage.configuration.ConfigOption;
+import org.janusgraph.diskstorage.configuration.Configuration;
+import org.janusgraph.diskstorage.es.compat.AbstractESCompat;
+import org.janusgraph.diskstorage.es.compat.ESCompatUtils;
+import org.janusgraph.diskstorage.es.mapping.IndexMapping;
+import org.janusgraph.diskstorage.es.script.ESScriptResponse;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.diskstorage.indexing.IndexFeatures;
+import org.janusgraph.diskstorage.indexing.IndexMutation;
+import org.janusgraph.diskstorage.indexing.IndexProvider;
+import org.janusgraph.diskstorage.indexing.IndexQuery;
+import org.janusgraph.diskstorage.indexing.KeyInformation;
+import org.janusgraph.diskstorage.indexing.RawQuery;
+import org.janusgraph.diskstorage.util.DefaultTransaction;
+import org.janusgraph.graphdb.configuration.PreInitializeConfigOptions;
+import org.janusgraph.graphdb.database.serialize.AttributeUtils;
+import org.janusgraph.graphdb.query.JanusGraphPredicate;
+import org.janusgraph.graphdb.query.condition.And;
+import org.janusgraph.graphdb.query.condition.Condition;
+import org.janusgraph.graphdb.query.condition.Not;
+import org.janusgraph.graphdb.query.condition.Or;
+import org.janusgraph.graphdb.query.condition.PredicateCondition;
+import org.janusgraph.graphdb.types.ParameterType;
+import org.locationtech.spatial4j.shape.Rectangle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.Spliterators;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_DOC_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_GEO_COORDS_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_LANG_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_SCRIPT_KEY;
+import static org.janusgraph.diskstorage.es.ElasticSearchConstants.ES_TYPE_KEY;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_MAX_RESULT_SET_SIZE;
+import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_NAME;
+
+/**
+ * Do not change
+ * This is a copy of ElasticSearchIndex.java from org.janusgraph.diskstorage.es
+ * Added a new method to new client instance
+ */
+
+@PreInitializeConfigOptions
+public class ElasticSearch7Index implements IndexProvider {
+
+    private static final Logger log = LoggerFactory.getLogger(ElasticSearchIndex.class);
+
+    // add elasticsearch index instance(Singleton Pattern)
+    private static ElasticSearch7Index instance = null;
+
+    private static final String STRING_MAPPING_SUFFIX = "__STRING";
+    // add if need new instance
+    public static final ConfigOption<Boolean> CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST = new ConfigOption(ELASTICSEARCH_NS, "create-client-per-request", "when false, allows the sharing of es client across other components.", org.janusgraph.diskstorage.configuration.ConfigOption.Type.LOCAL, false);
+
+    private static final String PARAMETERIZED_DELETION_SCRIPT = parameterizedScriptPrepare("",
+            "for (field in params.fields) {",
+            "    if (field.cardinality == 'SINGLE') {",
+            "        ctx._source.remove(field.name);",
+            "    } else if (ctx._source.containsKey(field.name)) {",
+            "        def fieldIndex = ctx._source[field.name].indexOf(field.value);",
+            "        if (fieldIndex >= 0 && fieldIndex < ctx._source[field.name].size()) {",
+            "            ctx._source[field.name].remove(fieldIndex);",
+            "        }",
+            "    }",
+            "}");
+
+    private static final String PARAMETERIZED_ADDITION_SCRIPT = parameterizedScriptPrepare("",
+            "for (field in params.fields) {",
+            "    if (ctx._source[field.name] == null) {",
+            "        ctx._source[field.name] = [];",
+            "    }",
+            "    if (field.cardinality != 'SET' || ctx._source[field.name].indexOf(field.value) == -1) {",
+            "        ctx._source[field.name].add(field.value);",
+            "    }",
+            "}");
+
+    static final String INDEX_NAME_SEPARATOR = "_";
+    private static final String SCRIPT_ID_SEPARATOR = "-";
+
+    private static final String MAX_OPEN_SCROLL_CONTEXT_PARAMETER = "search.max_open_scroll_context";
+    private static final Map<String, Object> MAX_RESULT_WINDOW = ImmutableMap.of("index.max_result_window", Integer.MAX_VALUE);
+
+    private static final Parameter[] NULL_PARAMETERS = null;
+
+    private static final String TRACK_TOTAL_HITS_PARAMETER = "track_total_hits";
+    private static final Parameter[] TRACK_TOTAL_HITS_DISABLED_PARAMETERS = new Parameter[]{new Parameter<>(TRACK_TOTAL_HITS_PARAMETER, false)};
+    private static final Map<String, Object> TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY = ImmutableMap.of(TRACK_TOTAL_HITS_PARAMETER, false);
+
+    private final Function<String, String> generateIndexStoreNameFunction = this::generateIndexStoreName;
+    private final Map<String, String> indexStoreNamesCache = new ConcurrentHashMap<>();
+    private final boolean indexStoreNameCacheEnabled;
+
+    private final AbstractESCompat compat;
+    private final ElasticSearchClient client;
+    private final Configuration configuration;
+    private final String indexName;
+    private final int batchSize;
+    private final boolean useExternalMappings;
+    private final boolean allowMappingUpdate;
+    private final Map<String, Object> indexSetting;
+    private final long createSleep;
+    private final boolean useAllField;
+    private final Map<String, Object> ingestPipelines;
+    private final boolean useMappingForES7;
+    private final String parameterizedAdditionScriptId;
+    private final String parameterizedDeletionScriptId;
+
+    private static boolean createElasticSearchClientPerRequest;
+
+    public ElasticSearch7Index(Configuration config) throws BackendException {
+        // fetch configuration
+        this.configuration = config;
+        indexName = config.get(INDEX_NAME);
+        parameterizedAdditionScriptId = generateScriptId("add");
+        parameterizedDeletionScriptId = generateScriptId("del");
+        useAllField = config.get(USE_ALL_FIELD);
+        useExternalMappings = config.get(USE_EXTERNAL_MAPPINGS);
+        allowMappingUpdate = config.get(ALLOW_MAPPING_UPDATE);
+        createSleep = config.get(CREATE_SLEEP);
+        ingestPipelines = config.getSubset(ES_INGEST_PIPELINES);
+        useMappingForES7 = config.get(USE_MAPPING_FOR_ES7);
+        indexStoreNameCacheEnabled = config.get(ENABLE_INDEX_STORE_NAMES_CACHE);
+        batchSize = config.get(INDEX_MAX_RESULT_SET_SIZE);
+        log.debug("Configured ES query nb result by query to {}", batchSize);
+
+        client = createElasticSearchClient();
+        createElasticSearchClientPerRequest = config.get(CREATE_ELASTICSEARCH_CLIENT_PER_REQUEST);
+
+        checkClusterHealth(config.get(HEALTH_REQUEST_TIMEOUT));
+
+        compat = ESCompatUtils.acquireCompatForVersion(client.getMajorVersion());
+
+        indexSetting = ElasticSearchSetup.getSettingsFromJanusGraphConf(config);
+
+        setupMaxOpenScrollContextsIfNeeded(config);
+
+        setupStoredScripts();
+
+        //set instance
+        ElasticSearch7Index.instance = this;
+    }
+
+
+    //get client
+    public static ElasticSearchClient getElasticSearchClient() {
+        ElasticSearchClient ret     = null;
+        ElasticSearch7Index esIndex = ElasticSearch7Index.instance;
+
+        if (esIndex != null) {
+            if (createElasticSearchClientPerRequest) {
+                log.debug("Creating a new ElasticSearch Client.");
+
+                ret = esIndex.createElasticSearchClient();
+            } else {
+                log.debug("Returning the elasticSearch client owned by ElasticSearchIndex.");
+
+                ret = esIndex.client;
+            }
+        } else {
+            log.debug("No ElasticSearchIndex available. Will return null");
+        }
+
+        return ret;
+    }
+
+    //release client
+    public static void releaseElasticSearchClient(ElasticSearchClient elasticSearchClient) {
+        if(createElasticSearchClientPerRequest) {
+            if (elasticSearchClient != null) {
+                try {
+                    elasticSearchClient.close();
+
+                    if(log.isDebugEnabled()) {
+                        log.debug("Closed the elasticSearch client successfully.");
+                    }
+                } catch (IOException excp) {
+                    log.warn("Failed to close elasticSearchClient.", excp);
+                }
+            }
+        } else {
+            if(log.isDebugEnabled()) {
+                log.debug("Ignoring the closing of elasticSearch client as it is owned by ElasticSearchIndex.");
+            }
+        }
+    }
+
+    //create client
+    private ElasticSearchClient createElasticSearchClient() {
+        return interfaceConfiguration(configuration).getClient();
+    }
+
+
+
+    private void checkClusterHealth(String healthCheck) throws BackendException {
+        try {
+            client.clusterHealthRequest(healthCheck);
+        } catch (final IOException e) {
+            throw new PermanentBackendException(e.getMessage(), e);
+        }
+    }
+
+    private void setupStoredScripts() throws PermanentBackendException {
+        setupStoredScriptIfNeeded(parameterizedAdditionScriptId, PARAMETERIZED_ADDITION_SCRIPT);
+        setupStoredScriptIfNeeded(parameterizedDeletionScriptId, PARAMETERIZED_DELETION_SCRIPT);
+    }
+
+    private void setupStoredScriptIfNeeded(String storedScriptId, String source) throws PermanentBackendException {
+
+        ImmutableMap<String, Object> preparedScript = compat.prepareScript(source).build();
+
+        String lang = (String) ((ImmutableMap<String, Object>) preparedScript.get(ES_SCRIPT_KEY)).get(ES_LANG_KEY);
+
+        try {
+            ESScriptResponse esScriptResponse = client.getStoredScript(storedScriptId);
+
+            if(Boolean.FALSE.equals(esScriptResponse.getFound()) || !Objects.equals(lang, esScriptResponse.getScript().getLang()) ||
+                    !Objects.equals(source, esScriptResponse.getScript().getSource())){
+                client.createStoredScript(storedScriptId, preparedScript);
+            }
+
+        } catch (final IOException e) {
+            throw new PermanentBackendException(e.getMessage(), e);
+        }
+    }
+
+    private void setupMaxOpenScrollContextsIfNeeded(Configuration config) throws PermanentBackendException {
+
+        if(client.getMajorVersion().getValue() > 6){
+
+            boolean setupMaxOpenScrollContexts;
+
+            if(config.has(SETUP_MAX_OPEN_SCROLL_CONTEXTS)){
+                setupMaxOpenScrollContexts = config.get(SETUP_MAX_OPEN_SCROLL_CONTEXTS);
+            } else {
+                setupMaxOpenScrollContexts = SETUP_MAX_OPEN_SCROLL_CONTEXTS.getDefaultValue();
+            }
+
+            if(setupMaxOpenScrollContexts){
+
+                Map<String, Object> settings = ImmutableMap.of("persistent",
+                        ImmutableMap.of(MAX_OPEN_SCROLL_CONTEXT_PARAMETER, Integer.MAX_VALUE));
+
+                try {
+                    client.updateClusterSettings(settings);
+                } catch (final IOException e) {
+                    throw new PermanentBackendException(e.getMessage(), e);
+                }
+            }
+        }
+    }
+
+    /**
+     * If ES already contains this instance's target index, then do nothing.
+     * Otherwise, create the index, then wait .
+     * <p>
+     * The {@code client} field must point to a live, connected client.
+     * The {@code indexName} field must be non-null and point to the name
+     * of the index to check for existence or create.
+     *
+     * @param index index name
+     * @throws IOException if the index status could not be checked or index could not be created
+     */
+    private void checkForOrCreateIndex(String index) throws IOException {
+        Objects.requireNonNull(client);
+        Objects.requireNonNull(index);
+
+        // Create index if it does not useExternalMappings and if it does not already exist
+        if (!useExternalMappings && !client.indexExists(index)) {
+            client.createIndex(index, indexSetting);
+            client.updateIndexSettings(index, MAX_RESULT_WINDOW);
+            try {
+                log.debug("Sleeping {} ms after {} index creation returned from actionGet()", createSleep, index);
+                Thread.sleep(createSleep);
+            } catch (final InterruptedException e) {
+                throw new JanusGraphException("Interrupted while waiting for index to settle in", e);
+            }
+        }
+        Preconditions.checkState(client.indexExists(index), "Could not create index: %s",index);
+        client.addAlias(indexName, index);
+    }
+
+
+    /**
+     * Configure ElasticSearchIndex's ES client. See{@link org.janusgraph.diskstorage.es.ElasticSearchSetup} for more
+     * information.
+     *
+     * @param config a config passed to ElasticSearchIndex's constructor
+     * @return a client object open and ready for use
+     */
+    private ElasticSearchSetup.Connection interfaceConfiguration(Configuration config) {
+        final ElasticSearchSetup clientMode = ConfigOption.getEnumValue(config.get(INTERFACE), ElasticSearchSetup.class);
+
+        try {
+            return clientMode.connect(config);
+        } catch (final IOException e) {
+            throw new JanusGraphException(e);
+        }
+    }
+
+    private BackendException convert(Exception esException) {
+        if (esException instanceof InterruptedException) {
+            return new TemporaryBackendException("Interrupted while waiting for response", esException);
+        } else {
+            return new PermanentBackendException("Unknown exception while executing index operation", esException);
+        }
+    }
+
+    private static String getDualMappingName(String key) {
+        return key + STRING_MAPPING_SUFFIX;
+    }
+
+    private String generateScriptId(String uniqueScriptSuffix){
+        return indexName + SCRIPT_ID_SEPARATOR + uniqueScriptSuffix;
+    }
+
+    private String generateIndexStoreName(String store){
+        return indexName + INDEX_NAME_SEPARATOR + store.toLowerCase();
+    }
+
+    private String getIndexStoreName(String store) {
+
+        if(indexStoreNameCacheEnabled){
+            return indexStoreNamesCache.computeIfAbsent(store, generateIndexStoreNameFunction);
+        }
+
+        return generateIndexStoreName(store);
+    }
+
+    @Override
+    public void register(String store, String key, KeyInformation information,
+                         BaseTransaction tx) throws BackendException {
+        final Class<?> dataType = information.getDataType();
+        final Mapping map = Mapping.getMapping(information);
+        Preconditions.checkArgument(map==Mapping.DEFAULT || AttributeUtils.isString(dataType) ||
+                        (map==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType)),
+                "Specified illegal mapping [%s] for data type [%s]",map,dataType);
+        final String indexStoreName = getIndexStoreName(store);
+        if (useExternalMappings) {
+            try {
+                //We check if the externalMapping have the property 'key'
+                final IndexMapping mappings = client.getMapping(indexStoreName, store);
+                if (mappings == null || (!mappings.isDynamic() && !mappings.getProperties().containsKey(key))) {
+                    //Error if it is not dynamic and have not the property 'key'
+                    throw new PermanentBackendException("The external mapping for index '"+ indexStoreName + "' and type '" + store + "' do not have property '" + key + "'");
+                } else if (allowMappingUpdate && mappings.isDynamic()) {
+                    //If it is dynamic, we push the unknown property 'key'
+                    this.pushMapping(store, key, information);
+                }
+            } catch (final IOException e) {
+                throw new PermanentBackendException(e);
+            }
+        } else {
+            try {
+                checkForOrCreateIndex(indexStoreName);
+            } catch (final IOException e) {
+                throw new PermanentBackendException(e);
+            }
+            this.pushMapping(store, key, information);
+        }
+    }
+
+    /**
+     * Push mapping to ElasticSearch
+     * @param store the type in the index
+     * @param key the name of the property in the index
+     * @param information information of the key
+     */
+    private void pushMapping(String store, String key,
+                             KeyInformation information) throws AssertionError, BackendException {
+        final Class<?> dataType = information.getDataType();
+        Mapping map = Mapping.getMapping(information);
+        final Map<String,Object> properties = new HashMap<>();
+        if (AttributeUtils.isString(dataType)) {
+            if (map==Mapping.DEFAULT) map=Mapping.TEXT;
+            log.debug("Registering string type for {} with mapping {}", key, map);
+            final String stringAnalyzer
+                    = ParameterType.STRING_ANALYZER.findParameter(information.getParameters(), null);
+            final String textAnalyzer = ParameterType.TEXT_ANALYZER.findParameter(information.getParameters(), null);
+            // use keyword type for string mappings unless custom string analyzer is provided
+            final Map<String,Object> stringMapping
+                    = stringAnalyzer == null ? compat.createKeywordMapping() : compat.createTextMapping(stringAnalyzer);
+            switch (map) {
+                case STRING:
+                    properties.put(key, stringMapping);
+                    break;
+                case TEXT:
+                    properties.put(key, compat.createTextMapping(textAnalyzer));
+                    break;
+                case TEXTSTRING:
+                    properties.put(key, compat.createTextMapping(textAnalyzer));
+                    properties.put(getDualMappingName(key), stringMapping);
+                    break;
+                default: throw new AssertionError("Unexpected mapping: "+map);
+            }
+        } else if (dataType == Float.class) {
+            log.debug("Registering float type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "float"));
+        } else if (dataType == Double.class) {
+            log.debug("Registering double type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "double"));
+        } else if (dataType == Byte.class) {
+            log.debug("Registering byte type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "byte"));
+        } else if (dataType == Short.class) {
+            log.debug("Registering short type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "short"));
+        } else if (dataType == Integer.class) {
+            log.debug("Registering integer type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "integer"));
+        } else if (dataType == Long.class) {
+            log.debug("Registering long type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "long"));
+        } else if (dataType == Boolean.class) {
+            log.debug("Registering boolean type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "boolean"));
+        } else if (dataType == Geoshape.class) {
+            switch (map) {
+                case PREFIX_TREE:
+                    final int maxLevels = ParameterType.INDEX_GEO_MAX_LEVELS.findParameter(information.getParameters(),
+                            DEFAULT_GEO_MAX_LEVELS);
+                    final double distErrorPct
+                            = ParameterType.INDEX_GEO_DIST_ERROR_PCT.findParameter(information.getParameters(),
+                            DEFAULT_GEO_DIST_ERROR_PCT);
+                    log.debug("Registering geo_shape type for {} with tree_levels={} and distance_error_pct={}", key,
+                            maxLevels, distErrorPct);
+                    properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_shape",
+                            "tree", "quadtree",
+                            "tree_levels", maxLevels,
+                            "distance_error_pct", distErrorPct));
+                    break;
+                default:
+                    log.debug("Registering geo_point type for {}", key);
+                    properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "geo_point"));
+            }
+        } else if (dataType == Date.class || dataType == Instant.class) {
+            log.debug("Registering date type for {}", key);
+            properties.put(key, ImmutableMap.of(ES_TYPE_KEY, "date"));
+        } else if (dataType == UUID.class) {
+            log.debug("Registering uuid type for {}", key);
+            properties.put(key, compat.createKeywordMapping());
+        }
+
+        if (useAllField) {
+            // add custom all field mapping if it doesn't exist
+            properties.put(ElasticSearchConstants.CUSTOM_ALL_FIELD, compat.createTextMapping(null));
+
+            // add copy_to for custom all field mapping
+            if (properties.containsKey(key) && dataType != Geoshape.class) {
+                final Map<String, Object> mapping = new HashMap<>(((Map<String, Object>) properties.get(key)));
+                mapping.put("copy_to", ElasticSearchConstants.CUSTOM_ALL_FIELD);
+                properties.put(key, mapping);
+            }
+        }
+
+        final List<Parameter> customParameters = ParameterType.getCustomParameters(information.getParameters());
+
+        if (properties.containsKey(key) && !customParameters.isEmpty()) {
+            final Map<String, Object> mapping = new HashMap<>(((Map<String, Object>) properties.get(key)));
+            customParameters.forEach(p -> mapping.put(p.key(), p.value()));
+            properties.put(key, mapping);
+        }
+
+        final Map<String,Object> mapping = ImmutableMap.of("properties", properties);
+
+        try {
+            client.createMapping(getIndexStoreName(store), store, mapping);
+        } catch (final Exception e) {
+            throw convert(e);
+        }
+    }
+
+    private static Mapping getStringMapping(KeyInformation information) {
+        assert AttributeUtils.isString(information.getDataType());
+        Mapping map = Mapping.getMapping(information);
+        if (map==Mapping.DEFAULT) map = Mapping.TEXT;
+        return map;
+    }
+
+    private static boolean hasDualStringMapping(KeyInformation information) {
+        return AttributeUtils.isString(information.getDataType()) && getStringMapping(information)==Mapping.TEXTSTRING;
+    }
+
+    public Map<String, Object> getNewDocument(final List<IndexEntry> additions,
+                                              KeyInformation.StoreRetriever information) throws BackendException {
+        // JSON writes duplicate fields one after another, which forces us
+        // at this stage to make de-duplication on the IndexEntry list. We don't want to pay the
+        // price map storage on the Mutation level because none of other backends need that.
+
+        final Multimap<String, IndexEntry> unique = LinkedListMultimap.create();
+        for (final IndexEntry e : additions) {
+            unique.put(e.field, e);
+        }
+
+        final Map<String, Object> doc = new HashMap<>();
+        for (final Map.Entry<String, Collection<IndexEntry>> add : unique.asMap().entrySet()) {
+            final KeyInformation keyInformation = information.get(add.getKey());
+            final Object value;
+            switch (keyInformation.getCardinality()) {
+                case SINGLE:
+                    value = convertToEsType(Iterators.getLast(add.getValue().iterator()).value,
+                            Mapping.getMapping(keyInformation));
+                    break;
+                case SET:
+                case LIST:
+                    value = add.getValue().stream()
+                            .map(v -> convertToEsType(v.value, Mapping.getMapping(keyInformation)))
+                            .filter(v -> {
+                                Preconditions.checkArgument(!(v instanceof byte[]),
+                                        "Collections not supported for %s", add.getKey());
+                                return true;
+                            }).toArray();
+                    break;
+                default:
+                    value = null;
+                    break;
+            }
+
+            doc.put(add.getKey(), value);
+            if (hasDualStringMapping(information.get(add.getKey())) && keyInformation.getDataType() == String.class) {
+                doc.put(getDualMappingName(add.getKey()), value);
+            }
+
+
+        }
+
+        return doc;
+    }
+
+    private static Object convertToEsType(Object value, Mapping mapping) {
+        if (value instanceof Number) {
+            if (AttributeUtils.isWholeNumber((Number) value)) {
+                return ((Number) value).longValue();
+            } else { //double or float
+                return ((Number) value).doubleValue();
+            }
+        } else if (AttributeUtils.isString(value)) {
+            return value;
+        } else if (value instanceof Geoshape) {
+            return convertGeoshape((Geoshape) value, mapping);
+        } else if (value instanceof Date) {
+            return value;
+        } else if (value instanceof  Instant) {
+            return Date.from((Instant) value);
+        } else if (value instanceof Boolean) {
+            return value;
+        } else if (value instanceof UUID) {
+            return value.toString();
+        } else throw new IllegalArgumentException("Unsupported type: " + value.getClass() + " (value: " + value + ")");
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Object convertGeoshape(Geoshape geoshape, Mapping mapping) {
+        if (geoshape.getType() == Geoshape.Type.POINT && Mapping.PREFIX_TREE != mapping) {
+            final Geoshape.Point p = geoshape.getPoint();
+            return new double[]{p.getLongitude(), p.getLatitude()};
+        } else if (geoshape.getType() == Geoshape.Type.BOX) {
+            final Rectangle box = geoshape.getShape().getBoundingBox();
+            final Map<String,Object> map = new HashMap<>();
+            map.put("type", "envelope");
+            map.put("coordinates", new double[][] {{box.getMinX(),box.getMaxY()},{box.getMaxX(),box.getMinY()}});
+            return map;
+        } else if (geoshape.getType() == Geoshape.Type.CIRCLE) {
+            try {
+                final Map<String,Object> map = geoshape.toMap();
+                map.put("radius", map.get("radius") + ((Map<String, String>) map.remove("properties")).get("radius_units"));
+                return map;
+            } catch (final IOException e) {
+                throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e);
+            }
+        } else {
+            try {
+                return geoshape.toMap();
+            } catch (final IOException e) {
+                throw new IllegalArgumentException("Invalid geoshape: " + geoshape, e);
+            }
+        }
+    }
+
+    @Override
+    public void mutate(Map<String, Map<String, IndexMutation>> mutations, KeyInformation.IndexRetriever information,
+                       BaseTransaction tx) throws BackendException {
+        final List<ElasticSearchMutation> requests = new ArrayList<>();
+        try {
+            for (final Map.Entry<String, Map<String, IndexMutation>> stores : mutations.entrySet()) {
+                final List<ElasticSearchMutation> requestByStore = new ArrayList<>();
+                final String storeName = stores.getKey();
+                final String indexStoreName = getIndexStoreName(storeName);
+                for (final Map.Entry<String, IndexMutation> entry : stores.getValue().entrySet()) {
+                    final String documentId = entry.getKey();
+                    final IndexMutation mutation = entry.getValue();
+                    assert mutation.isConsolidated();
+                    Preconditions.checkArgument(!(mutation.isNew() && mutation.isDeleted()));
+                    Preconditions.checkArgument(!mutation.isNew() || !mutation.hasDeletions());
+                    Preconditions.checkArgument(!mutation.isDeleted() || !mutation.hasAdditions());
+                    //Deletions first
+                    if (mutation.hasDeletions()) {
+                        if (mutation.isDeleted()) {
+                            log.trace("Deleting entire document {}", documentId);
+                            requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, storeName,
+                                    documentId));
+                        } else {
+                            List<Map<String, Object>> params = getParameters(information.get(storeName),
+                                    mutation.getDeletions(), true);
+                            Map doc = compat.prepareStoredScript(parameterizedDeletionScriptId, params).build();
+                            log.trace("Deletion script {} with params {}", PARAMETERIZED_DELETION_SCRIPT, params);
+                            requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName,
+                                    documentId, doc));
+                        }
+                    }
+                    if (mutation.hasAdditions()) {
+                        if (mutation.isNew()) { //Index
+                            log.trace("Adding entire document {}", documentId);
+                            final Map<String, Object> source = getNewDocument(mutation.getAdditions(),
+                                    information.get(storeName));
+                            requestByStore.add(ElasticSearchMutation.createIndexRequest(indexStoreName, storeName,
+                                    documentId, source));
+                        } else {
+                            final Map upsert;
+                            if (!mutation.hasDeletions()) {
+                                upsert = getNewDocument(mutation.getAdditions(), information.get(storeName));
+                            } else {
+                                upsert = null;
+                            }
+
+                            List<Map<String, Object>> params = getParameters(information.get(storeName),
+                                    mutation.getAdditions(), false, Cardinality.SINGLE);
+                            if (!params.isEmpty()) {
+                                ImmutableMap.Builder builder = compat.prepareStoredScript(parameterizedAdditionScriptId, params);
+                                requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName,
+                                        documentId, builder, upsert));
+                                log.trace("Adding script {} with params {}", PARAMETERIZED_ADDITION_SCRIPT, params);
+                            }
+
+                            final Map<String, Object> doc = getAdditionDoc(information, storeName, mutation);
+                            if (!doc.isEmpty()) {
+                                final ImmutableMap.Builder builder = ImmutableMap.builder().put(ES_DOC_KEY, doc);
+                                requestByStore.add(ElasticSearchMutation.createUpdateRequest(indexStoreName, storeName,
+                                        documentId, builder, upsert));
+                                log.trace("Adding update {}", doc);
+                            }
+                        }
+                    }
+                }
+                if (!requestByStore.isEmpty() && ingestPipelines.containsKey(storeName)) {
+                    client.bulkRequest(requestByStore, String.valueOf(ingestPipelines.get(storeName)));
+                } else if (!requestByStore.isEmpty()) {
+                    requests.addAll(requestByStore);
+                }
+            }
+            if (!requests.isEmpty()) {
+                client.bulkRequest(requests, null);
+            }
+        } catch (final Exception e) {
+            log.error("Failed to execute bulk Elasticsearch mutation", e);
+            throw convert(e);
+        }
+    }
+
+    private List<Map<String, Object>> getParameters(KeyInformation.StoreRetriever storeRetriever,
+                                                    List<IndexEntry> entries,
+                                                    boolean deletion,
+                                                    Cardinality... cardinalitiesToSkip) {
+        Set<Cardinality> cardinalityToSkipSet = Sets.newHashSet(cardinalitiesToSkip);
+        List<Map<String, Object>> result = new ArrayList<>();
+        for (IndexEntry entry : entries) {
+            KeyInformation info = storeRetriever.get(entry.field);
+            if (cardinalityToSkipSet.contains(info.getCardinality())) {
+                continue;
+            }
+            Object jsValue = deletion && info.getCardinality() == Cardinality.SINGLE ?
+                    "" : convertToEsType(entry.value, Mapping.getMapping(info));
+            result.add(ImmutableMap.of("name", entry.field,
+                    "value", jsValue,
+                    "cardinality", info.getCardinality().name()));
+            if (hasDualStringMapping(info)) {
+                result.add(ImmutableMap.of("name", getDualMappingName(entry.field),
+                        "value", jsValue,
+                        "cardinality", info.getCardinality().name()));
+            }
+        }
+        return result;
+    }
+
+    private Map<String,Object> getAdditionDoc(KeyInformation.IndexRetriever information,
+                                              String store, IndexMutation mutation) throws PermanentBackendException {
+        final Map<String,Object> doc = new HashMap<>();
+        for (final IndexEntry e : mutation.getAdditions()) {
+            final KeyInformation keyInformation = information.get(store).get(e.field);
+            if (keyInformation.getCardinality() == Cardinality.SINGLE) {
+                doc.put(e.field, convertToEsType(e.value, Mapping.getMapping(keyInformation)));
+                if (hasDualStringMapping(keyInformation)) {
+                    doc.put(getDualMappingName(e.field), convertToEsType(e.value, Mapping.getMapping(keyInformation)));
+                }
+            }
+        }
+
+        return doc;
+    }
+
+    @Override
+    public void restore(Map<String,Map<String, List<IndexEntry>>> documents, KeyInformation.IndexRetriever information,
+                        BaseTransaction tx) throws BackendException {
+        final List<ElasticSearchMutation> requests = new ArrayList<>();
+        try {
+            for (final Map.Entry<String, Map<String, List<IndexEntry>>> stores : documents.entrySet()) {
+                final List<ElasticSearchMutation> requestByStore = new ArrayList<>();
+                final String store = stores.getKey();
+                final String indexStoreName = getIndexStoreName(store);
+                for (final Map.Entry<String, List<IndexEntry>> entry : stores.getValue().entrySet()) {
+                    final String docID = entry.getKey();
+                    final List<IndexEntry> content = entry.getValue();
+                    if (content == null || content.size() == 0) {
+                        // delete
+                        if (log.isTraceEnabled())
+                            log.trace("Deleting entire document {}", docID);
+
+                        requestByStore.add(ElasticSearchMutation.createDeleteRequest(indexStoreName, store, docID));
+                    } else {
+                        // Add
+                        if (log.isTraceEnabled())
+                            log.trace("Adding entire document {}", docID);
+                        final Map<String, Object> source = getNewDocument(content, information.get(store));
+                        requestByStore.add(ElasticSearchMutation.createIndexRequest(indexStoreName, store, docID,
+                                source));
+                    }
+                }
+                if (!requestByStore.isEmpty() && ingestPipelines.containsKey(store)) {
+                    client.bulkRequest(requestByStore, String.valueOf(ingestPipelines.get(store)));
+                } else if (!requestByStore.isEmpty()) {
+                    requests.addAll(requestByStore);
+                }
+            }
+            if (!requests.isEmpty())
+                client.bulkRequest(requests, null);
+        } catch (final Exception e) {
+            throw convert(e);
+        }
+    }
+
+    private Map<String, Object> getRelationFromCmp(final Cmp cmp, String key, final Object value) {
+        switch (cmp) {
+            case EQUAL:
+                return compat.term(key, value);
+            case NOT_EQUAL:
+                return compat.boolMustNot(compat.term(key, value));
+            case LESS_THAN:
+                return compat.lt(key, value);
+            case LESS_THAN_EQUAL:
+                return compat.lte(key, value);
+            case GREATER_THAN:
+                return compat.gt(key, value);
+            case GREATER_THAN_EQUAL:
+                return compat.gte(key, value);
+            default:
+                throw new IllegalArgumentException("Unexpected relation: " + cmp);
+        }
+    }
+
+    public Map<String,Object> getFilter(Condition<?> condition, KeyInformation.StoreRetriever information) {
+        if (condition instanceof PredicateCondition) {
+            final PredicateCondition<String, ?> atom = (PredicateCondition) condition;
+            Object value = atom.getValue();
+            final String key = atom.getKey();
+            final JanusGraphPredicate predicate = atom.getPredicate();
+            if (value == null && predicate == Cmp.NOT_EQUAL) {
+                return compat.exists(key);
+            } else if (value instanceof Number) {
+                Preconditions.checkArgument(predicate instanceof Cmp,
+                        "Relation not supported on numeric types: %s", predicate);
+                return getRelationFromCmp((Cmp) predicate, key, value);
+            } else if (value instanceof String) {
+
+                final Mapping mapping = getStringMapping(information.get(key));
+                final String fieldName;
+                if (mapping==Mapping.TEXT && !(Text.HAS_CONTAINS.contains(predicate) || predicate instanceof Cmp))
+                    throw new IllegalArgumentException("Text mapped string values only support CONTAINS and Compare queries and not: " + predicate);
+                if (mapping==Mapping.STRING && Text.HAS_CONTAINS.contains(predicate))
+                    throw new IllegalArgumentException("String mapped string values do not support CONTAINS queries: " + predicate);
+                if (mapping==Mapping.TEXTSTRING && !(Text.HAS_CONTAINS.contains(predicate) || (predicate instanceof Cmp && predicate != Cmp.EQUAL))) {
+                    fieldName = getDualMappingName(key);
+                } else {
+                    fieldName = key;
+                }
+
+                if (predicate == Text.CONTAINS || predicate == Cmp.EQUAL) {
+                    return compat.match(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.match(fieldName, value))));
+                } else if (predicate == Text.CONTAINS_PHRASE) {
+                    return compat.matchPhrase(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS_PHRASE) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.matchPhrase(fieldName, value))));
+                } else if (predicate == Text.CONTAINS_PREFIX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.prefix(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS_PREFIX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.prefix(fieldName, value))));
+                } else if (predicate == Text.CONTAINS_REGEX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.regexp(fieldName, value);
+                } else if (predicate == Text.NOT_CONTAINS_REGEX) {
+                    if (!ParameterType.TEXT_ANALYZER.hasParameter(information.get(key).getParameters()))
+                        value = ((String) value).toLowerCase();
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.regexp(fieldName, value))));
+                } else if (predicate == Text.PREFIX) {
+                    return compat.prefix(fieldName, value);
+                } else if (predicate == Text.NOT_PREFIX) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.prefix(fieldName, value))));
+                } else if (predicate == Text.REGEX) {
+                    return compat.regexp(fieldName, value);
+                } else if (predicate == Text.NOT_REGEX) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.regexp(fieldName, value))));
+                } else if (predicate == Cmp.NOT_EQUAL) {
+                    return compat.boolMustNot(compat.match(fieldName, value));
+                } else if (predicate == Text.FUZZY || predicate == Text.CONTAINS_FUZZY) {
+                    return compat.fuzzyMatch(fieldName, value);
+                } else if (predicate == Text.NOT_FUZZY || predicate == Text.NOT_CONTAINS_FUZZY) {
+                    return compat.boolMust(ImmutableList.of(compat.exists(fieldName),
+                            compat.boolMustNot(compat.fuzzyMatch(fieldName, value))));
+                } else if (predicate == Cmp.LESS_THAN) {
+                    return compat.lt(fieldName, value);
+                } else if (predicate == Cmp.LESS_THAN_EQUAL) {
+                    return compat.lte(fieldName, value);
+                } else if (predicate == Cmp.GREATER_THAN) {
+                    return compat.gt(fieldName, value);
+                } else if (predicate == Cmp.GREATER_THAN_EQUAL) {
+                    return compat.gte(fieldName, value);
+                } else
+                    throw new IllegalArgumentException("Predicate is not supported for string value: " + predicate);
+            } else if (value instanceof Geoshape && Mapping.getMapping(information.get(key)) == Mapping.DEFAULT) {
+                // geopoint
+                final Geoshape shape = (Geoshape) value;
+                Preconditions.checkArgument(predicate instanceof Geo && predicate != Geo.CONTAINS,
+                        "Relation not supported on geopoint types: %s", predicate);
+
+                final Map<String,Object> query;
+                switch (shape.getType()) {
+                    case CIRCLE:
+                        final Geoshape.Point center = shape.getPoint();
+                        query = compat.geoDistance(key, center.getLatitude(), center.getLongitude(), shape.getRadius());
+                        break;
+                    case BOX:
+                        final Geoshape.Point southwest = shape.getPoint(0);
+                        final Geoshape.Point northeast = shape.getPoint(1);
+                        query = compat.geoBoundingBox(key, southwest.getLatitude(), southwest.getLongitude(),
+                                northeast.getLatitude(), northeast.getLongitude());
+                        break;
+                    case POLYGON:
+                        final List<List<Double>> points = IntStream.range(0, shape.size())
+                                .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(),
+                                        shape.getPoint(i).getLatitude()))
+                                .collect(Collectors.toList());
+                        query = compat.geoPolygon(key, points);
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unsupported or invalid search shape type for geopoint: "
+                                + shape.getType());
+                }
+
+                return predicate == Geo.DISJOINT ?  compat.boolMustNot(query) : query;
+            } else if (value instanceof Geoshape) {
+                Preconditions.checkArgument(predicate instanceof Geo,
+                        "Relation not supported on geoshape types: %s", predicate);
+                final Geoshape shape = (Geoshape) value;
+                final Map<String,Object> geo;
+                switch (shape.getType()) {
+                    case CIRCLE:
+                        final Geoshape.Point center = shape.getPoint();
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "circle",
+                                ES_GEO_COORDS_KEY, ImmutableList.of(center.getLongitude(), center.getLatitude()),
+                                "radius", shape.getRadius() + "km");
+                        break;
+                    case BOX:
+                        final Geoshape.Point southwest = shape.getPoint(0);
+                        final Geoshape.Point northeast = shape.getPoint(1);
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "envelope",
+                                ES_GEO_COORDS_KEY,
+                                ImmutableList.of(
+                                        ImmutableList.of(southwest.getLongitude(), northeast.getLatitude()),
+                                        ImmutableList.of(northeast.getLongitude(), southwest.getLatitude())));
+                        break;
+                    case LINE:
+                        final List lineCoords = IntStream.range(0, shape.size())
+                                .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(),
+                                        shape.getPoint(i).getLatitude()))
+                                .collect(Collectors.toList());
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "linestring", ES_GEO_COORDS_KEY, lineCoords);
+                        break;
+                    case POLYGON:
+                        final List polyCoords = IntStream.range(0, shape.size())
+                                .mapToObj(i -> ImmutableList.of(shape.getPoint(i).getLongitude(),
+                                        shape.getPoint(i).getLatitude()))
+                                .collect(Collectors.toList());
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "polygon", ES_GEO_COORDS_KEY,
+                                ImmutableList.of(polyCoords));
+                        break;
+                    case POINT:
+                        geo = ImmutableMap.of(ES_TYPE_KEY, "point",
+                                ES_GEO_COORDS_KEY, ImmutableList.of(shape.getPoint().getLongitude(),
+                                        shape.getPoint().getLatitude()));
+                        break;
+                    default:
+                        throw new IllegalArgumentException("Unsupported or invalid search shape type: "
+                                + shape.getType());
+                }
+
+                return compat.geoShape(key, geo, (Geo) predicate);
+            } else if (value instanceof Date || value instanceof Instant) {
+                Preconditions.checkArgument(predicate instanceof Cmp,
+                        "Relation not supported on date types: %s", predicate);
+
+                if (value instanceof Instant) {
+                    value = Date.from((Instant) value);
+                }
+                return getRelationFromCmp((Cmp) predicate, key, value);
+            } else if (value instanceof Boolean) {
+                final Cmp numRel = (Cmp) predicate;
+                switch (numRel) {
+                    case EQUAL:
+                        return compat.term(key, value);
+                    case NOT_EQUAL:
+                        return compat.boolMustNot(compat.term(key, value));
+                    default:
+                        throw new IllegalArgumentException("Boolean types only support EQUAL or NOT_EQUAL");
+                }
+            } else if (value instanceof UUID) {
+                if (predicate == Cmp.EQUAL) {
+                    return compat.term(key, value);
+                } else if (predicate == Cmp.NOT_EQUAL) {
+                    return compat.boolMustNot(compat.term(key, value));
+                } else {
+                    throw new IllegalArgumentException("Only equal or not equal is supported for UUIDs: "
+                            + predicate);
+                }
+            } else throw new IllegalArgumentException("Unsupported type: " + value);
+        } else if (condition instanceof Not) {
+            return compat.boolMustNot(getFilter(((Not) condition).getChild(),information));
+        } else if (condition instanceof And) {
+            final List queries = StreamSupport.stream(condition.getChildren().spliterator(), false)
+                    .map(c -> getFilter(c,information)).collect(Collectors.toList());
+            return compat.boolMust(queries);
+        } else if (condition instanceof Or) {
+            final List queries = StreamSupport.stream(condition.getChildren().spliterator(), false)
+                    .map(c -> getFilter(c,information)).collect(Collectors.toList());
+            return compat.boolShould(queries);
+        } else throw new IllegalArgumentException("Invalid condition: " + condition);
+    }
+
+    @Override
+    public Stream<String> query(IndexQuery query, KeyInformation.IndexRetriever informations,
+                                BaseTransaction tx) throws BackendException {
+        final ElasticSearchRequest sr = new ElasticSearchRequest();
+        final Map<String,Object> esQuery = getFilter(query.getCondition(), informations.get(query.getStore()));
+        sr.setQuery(compat.prepareQuery(esQuery));
+        if (!query.getOrder().isEmpty()) {
+            addOrderToQuery(informations, sr, query.getOrder(), query.getStore());
+        }
+        sr.setFrom(0);
+        if (query.hasLimit()) {
+            sr.setSize(Math.min(query.getLimit(), batchSize));
+        } else {
+            sr.setSize(batchSize);
+        }
+
+        sr.setDisableSourceRetrieval(true);
+
+        ElasticSearchResponse response;
+        try {
+            final String indexStoreName = getIndexStoreName(query.getStore());
+            final boolean useScroll = sr.getSize() >= batchSize;
+            response = client.search(indexStoreName,
+                    compat.createRequestBody(sr, useScroll? NULL_PARAMETERS : TRACK_TOTAL_HITS_DISABLED_PARAMETERS),
+                    useScroll);
+            log.debug("First Executed query [{}] in {} ms", query.getCondition(), response.getTook());
+            final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, sr.getSize());
+            final Stream<RawQuery.Result<String>> toReturn
+                    = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED), false);
+            return (query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn).map(RawQuery.Result::getResult);
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private Iterator<RawQuery.Result<String>> getResultsIterator(boolean useScroll, ElasticSearchResponse response, int windowSize){
+        return (useScroll)? new ElasticSearchScroll(client, response, windowSize) : response.getResults().iterator();
+    }
+
+    private String convertToEsDataType(Class<?> dataType, Mapping mapping) {
+        if(String.class.isAssignableFrom(dataType)) {
+            return "string";
+        }
+        else if (Integer.class.isAssignableFrom(dataType)) {
+            return "integer";
+        }
+        else if (Long.class.isAssignableFrom(dataType)) {
+            return "long";
+        }
+        else if (Float.class.isAssignableFrom(dataType)) {
+            return "float";
+        }
+        else if (Double.class.isAssignableFrom(dataType)) {
+            return "double";
+        }
+        else if (Boolean.class.isAssignableFrom(dataType)) {
+            return "boolean";
+        }
+        else if (Date.class.isAssignableFrom(dataType)) {
+            return "date";
+        }
+        else if (Instant.class.isAssignableFrom(dataType)) {
+            return "date";
+        }
+        else if (Geoshape.class.isAssignableFrom(dataType)) {
+            return mapping == Mapping.DEFAULT ? "geo_point" : "geo_shape";
+        }
+
+        return null;
+    }
+
+    private ElasticSearchResponse runCommonQuery(RawQuery query, KeyInformation.IndexRetriever informations, BaseTransaction tx, int size,
+                                                 boolean useScroll) throws BackendException{
+        final ElasticSearchRequest sr = new ElasticSearchRequest();
+        sr.setQuery(compat.queryString(query.getQuery()));
+        if (!query.getOrders().isEmpty()) {
+            addOrderToQuery(informations, sr, query.getOrders(), query.getStore());
+        }
+        sr.setFrom(0);
+        sr.setSize(size);
+        sr.setDisableSourceRetrieval(true);
+        try {
+            Map<String, Object> requestBody = compat.createRequestBody(sr, query.getParameters());
+            if(!useScroll) {
+                if (requestBody == null) {
+                    requestBody = TRACK_TOTAL_HITS_DISABLED_REQUEST_BODY;
+                } else {
+                    requestBody.put(TRACK_TOTAL_HITS_PARAMETER, false);
+                }
+            }
+            return client.search(
+                    getIndexStoreName(query.getStore()),
+                    requestBody,
+                    useScroll);
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private long runCountQuery(RawQuery query) throws BackendException{
+        try {
+            return client.countTotal(
+                    getIndexStoreName(query.getStore()),
+                    compat.createRequestBody(compat.queryString(query.getQuery()), query.getParameters()));
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    private void addOrderToQuery(KeyInformation.IndexRetriever informations, ElasticSearchRequest sr, final List<IndexQuery.OrderEntry> orders,
+                                 String store) {
+        for (final IndexQuery.OrderEntry orderEntry : orders) {
+            final String order = orderEntry.getOrder().name();
+            final KeyInformation information = informations.get(store).get(orderEntry.getKey());
+            final Mapping mapping = Mapping.getMapping(information);
+            final Class<?> datatype = orderEntry.getDatatype();
+            final String key = hasDualStringMapping(information) ? getDualMappingName(orderEntry.getKey()) : orderEntry.getKey();
+            sr.addSort(key, order.toLowerCase(), convertToEsDataType(datatype, mapping));
+        }
+    }
+
+    @Override
+    public Stream<RawQuery.Result<String>> query(RawQuery query, KeyInformation.IndexRetriever information,
+                                                 BaseTransaction tx) throws BackendException {
+        final int size = query.hasLimit() ? Math.min(query.getLimit() + query.getOffset(), batchSize) : batchSize;
+        final boolean useScroll = size >= batchSize;
+        final ElasticSearchResponse response = runCommonQuery(query, information, tx, size, useScroll);
+        log.debug("First Executed query [{}] in {} ms", query.getQuery(), response.getTook());
+        final Iterator<RawQuery.Result<String>> resultIterator = getResultsIterator(useScroll, response, size);
+        final Stream<RawQuery.Result<String>> toReturn
+                = StreamSupport.stream(Spliterators.spliteratorUnknownSize(resultIterator, Spliterator.ORDERED),
+                false).skip(query.getOffset());
+        return query.hasLimit() ? toReturn.limit(query.getLimit()) : toReturn;
+    }
+
+    @Override
+    public Long queryCount(IndexQuery query, KeyInformation.IndexRetriever information, BaseTransaction tx) throws BackendException {
+        final ElasticSearchRequest sr = new ElasticSearchRequest();
+        final Map<String,Object> esQuery = getFilter(query.getCondition(), information.get(query.getStore()));
+        sr.setQuery(compat.prepareQuery(esQuery));
+        try {
+            return client.countTotal(
+                    getIndexStoreName(query.getStore()),
+                    compat.createRequestBody(sr, null));
+        } catch (final IOException | UncheckedIOException e) {
+            throw new PermanentBackendException(e);
+        }
+    }
+
+    @Override
+    public Long totals(RawQuery query, KeyInformation.IndexRetriever information,
+                       BaseTransaction tx) throws BackendException {
+        long startTime = System.currentTimeMillis();
+        long count = runCountQuery(query);
+        if(log.isDebugEnabled()){
+            log.debug("Executed count query [{}] in {} ms", query.getQuery(), System.currentTimeMillis() - startTime);
+        }
+        return count;
+    }
+
+    @Override
+    public boolean supports(KeyInformation information, JanusGraphPredicate janusgraphPredicate) {
+        final Class<?> dataType = information.getDataType();
+        final Mapping mapping = Mapping.getMapping(information);
+        if (mapping!=Mapping.DEFAULT && !AttributeUtils.isString(dataType) &&
+                !(mapping==Mapping.PREFIX_TREE && AttributeUtils.isGeo(dataType))) return false;
+
+        if (Number.class.isAssignableFrom(dataType)) {
+            return janusgraphPredicate instanceof Cmp;
+        } else if (dataType == Geoshape.class) {
+            switch(mapping) {
+                case DEFAULT:
+                    return janusgraphPredicate instanceof Geo && janusgraphPredicate != Geo.CONTAINS;
+                case PREFIX_TREE:
+                    return janusgraphPredicate instanceof Geo;
+            }
+        } else if (AttributeUtils.isString(dataType)) {
+            switch(mapping) {
+                case DEFAULT:
+                case TEXT:
+                    return janusgraphPredicate == Text.CONTAINS || janusgraphPredicate == Text.NOT_CONTAINS
+                            || janusgraphPredicate == Text.CONTAINS_FUZZY || janusgraphPredicate == Text.NOT_CONTAINS_FUZZY
+                            || janusgraphPredicate == Text.CONTAINS_PREFIX || janusgraphPredicate == Text.NOT_CONTAINS_PREFIX
+                            || janusgraphPredicate == Text.CONTAINS_REGEX || janusgraphPredicate == Text.NOT_CONTAINS_REGEX
+                            || janusgraphPredicate == Text.CONTAINS_PHRASE || janusgraphPredicate == Text.NOT_CONTAINS_PHRASE;
+                case STRING:
+                    return janusgraphPredicate instanceof Cmp
+                            || janusgraphPredicate==Text.REGEX || janusgraphPredicate==Text.NOT_REGEX
+                            || janusgraphPredicate==Text.PREFIX || janusgraphPredicate==Text.NOT_PREFIX
+                            || janusgraphPredicate == Text.FUZZY || janusgraphPredicate == Text.NOT_FUZZY;
+                case TEXTSTRING:
+                    return janusgraphPredicate instanceof Text || janusgraphPredicate instanceof Cmp;
+            }
+        } else if (dataType == Date.class || dataType == Instant.class) {
+            return janusgraphPredicate instanceof Cmp;
+        } else if (dataType == Boolean.class) {
+            return janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate == Cmp.NOT_EQUAL;
+        } else if (dataType == UUID.class) {
+            return janusgraphPredicate == Cmp.EQUAL || janusgraphPredicate==Cmp.NOT_EQUAL;
+        }
+        return false;
+    }
+
+
+    @Override
+    public boolean supports(KeyInformation information) {
+        final Class<?> dataType = information.getDataType();
+        final Mapping mapping = Mapping.getMapping(information);
+        if (Number.class.isAssignableFrom(dataType) || dataType == Date.class || dataType== Instant.class
+                || dataType == Boolean.class || dataType == UUID.class) {
+            return mapping == Mapping.DEFAULT;
+        } else if (AttributeUtils.isString(dataType)) {
+            return mapping == Mapping.DEFAULT || mapping == Mapping.STRING
+                    || mapping == Mapping.TEXT || mapping == Mapping.TEXTSTRING;
+        } else if (AttributeUtils.isGeo(dataType)) {
+            return mapping == Mapping.DEFAULT || mapping == Mapping.PREFIX_TREE;
+        }
+        return false;
+    }
+
+    @Override
+    public String mapKey2Field(String key, KeyInformation information) {
+        IndexProvider.checkKeyValidity(key);
+        return key.replace(' ', IndexProvider.REPLACEMENT_CHAR);
+    }
+
+    @Override
+    public IndexFeatures getFeatures() {
+        return compat.getIndexFeatures();
+    }
+
+    @Override
+    public BaseTransactionConfigurable beginTransaction(BaseTransactionConfig config) throws BackendException {
+        return new DefaultTransaction(config);
+    }
+
+    @Override
+    public void close() throws BackendException {
+        try {
+            client.close();
+        } catch (final IOException e) {
+            throw new PermanentBackendException(e);
+        }
+
+    }
+
+    @Override
+    public void clearStorage() throws BackendException {
+        try {
+            client.deleteIndex(indexName);
+        } catch (final Exception e) {
+            throw new PermanentBackendException("Could not delete index " + indexName, e);
+        } finally {
+            close();
+        }
+    }
+
+    @Override
+    public boolean exists() throws BackendException {
+        try {
+            return client.indexExists(indexName);
+        } catch (final IOException e) {
+            throw new PermanentBackendException("Could not check if index " + indexName + " exists", e);
+        }
+    }
+
+    ElasticMajorVersion getVersion() {
+        return client.getMajorVersion();
+    }
+
+    boolean isUseMappingForES7(){
+        return useMappingForES7;
+    }
+
+    private static String parameterizedScriptPrepare(String ... lines){
+        return Arrays.stream(lines).map(String::trim).collect(Collectors.joining(""));
+    }
+}
\ No newline at end of file
diff --git a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
index 4a37c77..f83dff6 100644
--- a/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
+++ b/intg/src/main/java/org/apache/atlas/ApplicationProperties.java
@@ -50,14 +50,17 @@ public final class ApplicationProperties extends PropertiesConfiguration {
     public static final String  INDEX_BACKEND_CONF              = "atlas.graph.index.search.backend";
     public static final String  INDEX_MAP_NAME_CONF             = "atlas.graph.index.search.map-name";
     public static final String  SOLR_WAIT_SEARCHER_CONF         = "atlas.graph.index.search.solr.wait-searcher";
+    public static final String  ELASTICSEARCH_INDEX_NAME_CONF   = "atlas.graph.index.search.elasticsearch.index-name";
     public static final String  INDEX_RECOVERY_CONF             = "atlas.index.recovery.enable";
     public static final String  ENABLE_FULLTEXT_SEARCH_CONF     = "atlas.search.fulltext.enable";
     public static final String  ENABLE_FREETEXT_SEARCH_CONF     = "atlas.search.freetext.enable";
     public static final String  ATLAS_RUN_MODE                  = "atlas.run.mode";
     public static final String  GRAPHBD_BACKEND_JANUS           = "janus";
+    public static final String  DEFAULT_INDEX_NAME              = "janusgraph";
     public static final String  STORAGE_BACKEND_HBASE           = "hbase";
     public static final String  STORAGE_BACKEND_HBASE2          = "hbase2";
     public static final String  INDEX_BACKEND_SOLR              = "solr";
+    public static final String  INDEX_BACKEND_ELASTICSEARCH     = "elasticsearch";
     public static final String  LDAP_TYPE                       =  "atlas.authentication.method.ldap.type";
     public static final String  LDAP                            =  "LDAP";
     public static final String  AD                              =  "AD";
@@ -355,6 +358,10 @@ public final class ApplicationProperties extends PropertiesConfiguration {
                 addPropertyDirect(INDEX_MAP_NAME_CONF, DEFAULT_INDEX_MAP_NAME);
                 LOG.info("Setting index.search.map-name property '" + DEFAULT_INDEX_MAP_NAME + "'");
             }
+        } else if (indexBackend.equalsIgnoreCase(INDEX_BACKEND_ELASTICSEARCH)){
+           addPropertyDirect(ELASTICSEARCH_INDEX_NAME_CONF, DEFAULT_INDEX_NAME);
+
+           LOG.info("Setting elasticsearch.index-name property '" + DEFAULT_INDEX_NAME + "'");
         }
 
         // setting value for 'atlas.graph.index.search.max-result-set-size' (default = 500000)
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index b316354..57964d5 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -166,13 +166,13 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
 
             while (shouldRun.get()) {
                 try {
-                    boolean solrHealthy = isSolrHealthy();
+                    boolean isIdxHealthy = isIndexBackendHealthy();
 
-                    if (this.txRecoveryObject == null && solrHealthy) {
+                    if (this.txRecoveryObject == null && isIdxHealthy) {
                         startMonitoring();
                     }
 
-                    if (this.txRecoveryObject != null && !solrHealthy) {
+                    if (this.txRecoveryObject != null && !isIdxHealthy) {
                         stopMonitoring();
                     }
                 } catch (Exception e) {
@@ -197,7 +197,7 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler {
             }
         }
 
-        private boolean isSolrHealthy() throws AtlasException, InterruptedException {
+        private boolean isIndexBackendHealthy() throws AtlasException, InterruptedException {
             Thread.sleep(indexStatusCheckRetryMillis);
 
             return this.graph.getGraphIndexClient().isHealthy();