You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by dj...@apache.org on 2021/08/11 18:22:27 UTC

[gora] branch master updated: GORA-664 Add datastore for Elasticsearch (#234)

This is an automated email from the ASF dual-hosted git repository.

djkevincr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gora.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d0d910  GORA-664 Add datastore for Elasticsearch (#234)
2d0d910 is described below

commit 2d0d91093dd7bceb07a4d64fb6b075bdc2117abd
Author: Maria Podorvanova <36...@users.noreply.github.com>
AuthorDate: Thu Aug 12 04:22:22 2021 +1000

    GORA-664 Add datastore for Elasticsearch (#234)
    
    * Create basic gora-elasticsearch module
    
    * Bump Elasticsearch version and remove redundant dependency
    
    * Implement connection and basic schema management
    
    - Create ElasticsearchStore class with connection initialization
    - Create basic Elasticsearch types mapping
    - Implement the necessary files for mapping representation (ElasticsearchMapping, ElasticsearchMappingBuilder)
    - Read schema from mapping file
    - Cover initialization with test
    
    * Set up Elasticsearch client parameters
    
    - Created gora.properties file with configuration properties
    - Loaded connection parameters from configuration
    - Implemented connection to Elasticsearch cluster with ElasticsearchParameters
    - Covered ElasticsearchParameters with tests
    - Added javadoc descriptions
    
    * Add a property for choosing the authentication method
    
    * Implement testing with Elasticsearch container
    
    - Added testing dependencies
    - Added GoraElasticsearchTestDriver with Elasticsearch container
    - Added javadoc descriptions to GoraElasticsearchTestDriver class
    - Fixed two existing tests in accordance to Elasticsearch container
    
    * Implement some methods for schema management
    
    Implemented schemaExists, createSchema, deleteSchema and flush methods
    
    * Add XSD validation file for the XML mapping
    
    * Fix XSD validation
    
    - Relocated gora-elasticsearch.xsd file to main resources
    - Covered XSD validation with test
    - Added gora-elasticsearch-mapping-invalid.xml file for test
    
    * Set up Elasticsearch container's authentication parameters
    
    * Implement exists method
    
    * Add comments for the connection parameters
    
    * Fix authentication
    
    - Set up password to Elasticsearch container properly
    - Set default Elasticsearch container server’s username in gora.properties
    - Added exceptions for missing arguments in authentication
    
    * Add parameter for the XSD validation
    
    - Defined a parameter for the XSD validation
    - Added a test case for the parameter
    - Made ElasticsearchStore read mapping file from properties, not configuration
    
    * Implement some basic Input-Output operations for schema management
    
    - Implemented delete, get and put methods
    - Implemented newInstance and getUnionSchema utility methods
    - Implemented basic serialization/deserialization for primitive AVRO types
    
    * Fix createSchema method
    
    - Added mappings while creating an Elasticsearch index
    - Added getter and setter to Datatype enum
    
    * Implement serialization/deserialization for some Avro data types
    
    - Implemented serializeFieldValue and deserializeFieldValue methods for ARRAY, BOOLEAN, BYTES and FIXED Avro data types
    - Fixed deserialization for STRING Avro data type
    - Added javadoc descriptions
    
    * Fix NPE when getting a non-existent Elasticsearch document
    
    * Implement serialization/deserialization for MAP Avro data type
    
    * Refactor serialization/deserialization to have better javadocs and arguments
    
    * Implement serialization/deserialization for RECORD Avro data type
    
    * Implement serialization/deserialization for UNION Avro data type
    
    * Fix passed Schema argument for ARRAY deserialization
    
    * Fix BYTES deserialization for Base64 encoded String
    
    * Ignore testGet3UnionField test
    
    * Add javadoc descriptions to serialization and deserialization methods
    
    * Implement newQuery method
    
    * Implement deleteByQuery method
    
    * Use an Enum instead of literal strings for the Authentication Type parameter
    
    * Use parameterized logging instead of string concatenation
    
    * Implement execute method
    
    * Implement getPartitions method
    
    * Add scaling_factor support
    
    * Remove unsupported Elasticsearch data types
    
    * Implement Metadata Analyzer for Elasticsearch Store
    
    * Try to fix range query by “_id” field
    
    * Fix execute method by adding a special "gora_id" field
    
    * Implement deleting specific fields of the records in deleteByQuery method
    
    * Implement MapReduce test
    
    * Fix flush method by using refresh
    
    * Address reviewer's comments
    
    * Add Elasticsearch specific logging dependency
---
 gora-elasticsearch/pom.xml                         | 177 +++++
 .../mapping/ElasticsearchMapping.java              |  73 ++
 .../mapping/ElasticsearchMappingBuilder.java       | 210 +++++
 .../apache/gora/elasticsearch/mapping/Field.java   | 200 +++++
 .../gora/elasticsearch/mapping/package-info.java   |  20 +
 .../apache/gora/elasticsearch/package-info.java    |  20 +
 .../elasticsearch/query/ElasticsearchQuery.java    |  41 +
 .../elasticsearch/query/ElasticsearchResult.java   |  72 ++
 .../gora/elasticsearch/query/package-info.java     |  20 +
 .../elasticsearch/store/ElasticsearchStore.java    | 864 +++++++++++++++++++++
 .../ElasticsearchStoreCollectionMetadata.java      |  53 ++
 .../store/ElasticsearchStoreMetadataAnalyzer.java  | 102 +++
 .../gora/elasticsearch/store/package-info.java     |  20 +
 .../elasticsearch/utils/AuthenticationType.java    |  35 +
 .../utils/ElasticsearchConstants.java              |  49 ++
 .../utils/ElasticsearchParameters.java             | 274 +++++++
 .../gora/elasticsearch/utils/package-info.java     |  20 +
 .../src/main/resources/gora-elasticsearch.xsd      |  61 ++
 .../elasticsearch/GoraElasticsearchTestDriver.java |  71 ++
 .../mapreduce/ElasticsearchStoreMapReduceTest.java |  59 ++
 .../gora/elasticsearch/mapreduce/package-info.java |  20 +
 .../apache/gora/elasticsearch/package-info.java    |  20 +
 .../store/TestElasticsearchStore.java              | 173 +++++
 .../gora/elasticsearch/store/package-info.java     |  20 +
 .../gora-elasticsearch-mapping-invalid.xml         |  31 +
 .../test/resources/gora-elasticsearch-mapping.xml  |  48 ++
 .../src/test/resources/gora.properties             |  40 +
 pom.xml                                            |  10 +
 28 files changed, 2803 insertions(+)

diff --git a/gora-elasticsearch/pom.xml b/gora-elasticsearch/pom.xml
new file mode 100644
index 0000000..28ba932
--- /dev/null
+++ b/gora-elasticsearch/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+	Unless required by applicable law or agreed to in writing,
+	software distributed under the License is distributed on an
+	"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	KIND, either express or implied.  See the License for the
+	specific language governing permissions and limitations
+	under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.gora</groupId>
+        <artifactId>gora</artifactId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>gora-elasticsearch</artifactId>
+    <packaging>bundle</packaging>
+
+    <name>Apache Gora :: Elasticsearch</name>
+    <url>http://gora.apache.org</url>
+    <description>The Apache Gora open source framework provides an in-memory data model and
+        persistence for big data. Gora supports persisting to column stores, key value stores,
+        document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce
+        support.</description>
+    <inceptionYear>2010</inceptionYear>
+    <organization>
+        <name>The Apache Software Foundation</name>
+        <url>http://www.apache.org/</url>
+    </organization>
+    <issueManagement>
+        <system>JIRA</system>
+        <url>https://issues.apache.org/jira/browse/GORA</url>
+    </issueManagement>
+    <ciManagement>
+        <system>Jenkins</system>
+        <url>https://builds.apache.org/job/Gora-trunk/</url>
+    </ciManagement>
+
+    <properties>
+        <osgi.import>*</osgi.import>
+        <osgi.export>org.apache.gora.elasticsearch*;version="${project.version}";-noimport:=true</osgi.export>
+    </properties>
+
+    <build>
+        <directory>target</directory>
+        <outputDirectory>target/classes</outputDirectory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <testOutputDirectory>target/test-classes</testOutputDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testResources>
+            <testResource>
+                <directory>${project.basedir}/src/test/resources</directory>
+                <includes>
+                    <include>**/*</include>
+                </includes>
+                <!--targetPath>${project.basedir}/target/classes/</targetPath -->
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>${build-helper-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/examples/java</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <!-- Gora Internal Dependencies -->
+        <dependency>
+            <groupId>org.apache.gora</groupId>
+            <artifactId>gora-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.gora</groupId>
+            <artifactId>gora-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch dependencies -->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jdom</groupId>
+            <artifactId>jdom</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <!-- Logging Dependencies -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-to-slf4j</artifactId>
+            <version>2.8.2</version>
+        </dependency>
+
+        <!-- Testing Dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
new file mode 100644
index 0000000..d31e64f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Mapping definitions for Elasticsearch.
+ */
+public class ElasticsearchMapping {
+
+    private String indexName;
+    private Map<String, Field> fields;
+
+    /**
+     * Empty constructor for the ElasticsearchMapping class.
+     */
+    public ElasticsearchMapping() {
+        fields = new HashMap<>();
+    }
+
+    /**
+     * Returns the name of Elasticsearch index linked to the mapping.
+     *
+     * @return Index's name
+     */
+    public String getIndexName() {
+        return indexName;
+    }
+
+    /**
+     * Sets the index name of the Elasticsearch mapping.
+     *
+     * @param indexName Index's name
+     */
+    public void setIndexName(String indexName) {
+        this.indexName = indexName;
+    }
+
+    /**
+     * Returns a map with all mapped fields.
+     *
+     * @return Map containing mapped fields
+     */
+    public Map<String, Field> getFields() {
+        return fields;
+    }
+
+    /**
+     * Add a new field to the mapped fields.
+     *
+     * @param classFieldName Field name in the persisted class
+     * @param field Mapped field from Elasticsearch index
+     */
+    public void addField(String classFieldName, Field field) {
+        fields.put(classFieldName, field);
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
new file mode 100644
index 0000000..30dba7b
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import com.google.inject.ConfigurationException;
+import org.apache.commons.io.IOUtils;
+import org.apache.gora.elasticsearch.store.ElasticsearchStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Builder for Mapping definitions of Elasticsearch.
+ */
+public class ElasticsearchMappingBuilder<K, T extends PersistentBase> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
+
+    /**
+     * XSD validation file for the XML mapping.
+     */
+    private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
+
+    // Index description
+    static final String ATT_NAME = "name";
+
+    static final String ATT_TYPE = "type";
+
+    // Class description
+    static final String TAG_CLASS = "class";
+
+    static final String ATT_KEYCLASS = "keyClass";
+
+    static final String ATT_INDEX = "index";
+
+    static final String TAG_FIELD = "field";
+
+    static final String ATT_DOCFIELD = "docfield";
+
+    static final String ATT_SCALINGFACTOR = "scalingFactor";
+
+    /**
+     * Mapping instance being built.
+     */
+    private ElasticsearchMapping elasticsearchMapping;
+
+    private final ElasticsearchStore<K, T> dataStore;
+
+    /**
+     * Constructor for ElasticsearchMappingBuilder.
+     *
+     * @param store ElasticsearchStore instance
+     */
+    public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
+        this.elasticsearchMapping = new ElasticsearchMapping();
+        this.dataStore = store;
+    }
+
+    /**
+     * Returns the Elasticsearch Mapping being built.
+     *
+     * @return Elasticsearch Mapping instance
+     */
+    public ElasticsearchMapping getElasticsearchMapping() {
+        return elasticsearchMapping;
+    }
+
+    /**
+     * Sets the Elasticsearch Mapping.
+     *
+     * @param elasticsearchMapping Elasticsearch Mapping instance
+     */
+    public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
+        this.elasticsearchMapping = elasticsearchMapping;
+    }
+
+    /**
+     * Reads Elasticsearch mappings from file.
+     *
+     * @param inputStream   Mapping input stream
+     * @param xsdValidation Parameter for enabling XSD validation
+     */
+    public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
+        try {
+            SAXBuilder saxBuilder = new SAXBuilder();
+            if (inputStream == null) {
+                LOG.error("The mapping input stream is null!");
+                throw new GoraException("The mapping input stream is null!");
+            }
+
+            // Convert input stream to a string to use it a few times
+            String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
+
+            // XSD validation for XML file
+            if (xsdValidation) {
+                Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+                Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
+                        .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
+                schema.newValidator().validate(xmlSource);
+                LOG.info("Mapping file is valid.");
+            }
+
+            Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+            if (document == null) {
+                LOG.error("The mapping document is null!");
+                throw new GoraException("The mapping document is null!");
+            }
+
+            Element root = document.getRootElement();
+            // Extract class descriptions
+            @SuppressWarnings("unchecked")
+            List<Element> classElements = root.getChildren(TAG_CLASS);
+            for (Element classElement : classElements) {
+                final Class<T> persistentClass = dataStore.getPersistentClass();
+                final Class<K> keyClass = dataStore.getKeyClass();
+                if (haveKeyClass(keyClass, classElement)
+                        && havePersistentClass(persistentClass, classElement)) {
+                    loadPersistentClass(classElement, persistentClass);
+                    break;
+                }
+            }
+        } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
+            throw new RuntimeException(ex);
+        }
+        LOG.info("Gora Elasticsearch mapping file was read successfully.");
+    }
+
+    private boolean haveKeyClass(final Class<K> keyClass,
+                                 final Element classElement) {
+        return classElement.getAttributeValue(ATT_KEYCLASS).equals(
+                keyClass.getName());
+    }
+
+    private boolean havePersistentClass(final Class<T> persistentClass,
+                                        final Element classElement) {
+        return classElement.getAttributeValue(ATT_NAME).equals(
+                persistentClass.getName());
+    }
+
+    /**
+     * Handle the XML parsing of the class definition.
+     *
+     * @param classElement the XML node containing the class definition
+     */
+    protected void loadPersistentClass(Element classElement,
+                                       Class<T> pPersistentClass) {
+
+        String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
+        String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
+
+        elasticsearchMapping.setIndexName(indexName);
+        // docNameFromMapping could be null here
+        if (!indexName.equals(indexNameFromMapping)) {
+            ElasticsearchStore.LOG
+                    .info("Keyclass and nameclass match, but mismatching index names. "
+                            + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
+                            indexNameFromMapping, indexName);
+            if (indexNameFromMapping != null) {
+                elasticsearchMapping.setIndexName(indexName);
+            }
+        }
+
+        // Process fields declaration
+        @SuppressWarnings("unchecked")
+        List<Element> fields = classElement.getChildren(TAG_FIELD);
+        for (Element fieldElement : fields) {
+            String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
+            Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
+            Field field;
+            if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
+                int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
+                field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
+            } else {
+                field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
+            }
+            elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
+        }
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
new file mode 100644
index 0000000..73016cb
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+/**
+ * Field definition for the Elasticsearch index.
+ */
+public class Field {
+
+    private String name;
+    private FieldType dataType;
+
+    /**
+     * Constructor for Field.
+     *
+     * @param name Field's name
+     * @param dataType Field's data type
+     */
+    public Field(String name, FieldType dataType) {
+        this.name = name;
+        this.dataType = dataType;
+    }
+
+    /**
+     * Returns the field's name.
+     *
+     * @return Field's name
+     */
+    public String getName() {
+        return name;
+    }
+
+    /**
+     * Sets the field's name.
+     *
+     * @param name Field's name
+     */
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * Returns the field's data-type.
+     *
+     * @return Field's data-type
+     */
+    public FieldType getDataType() {
+        return dataType;
+    }
+
+    /**
+     * Sets the field's data-type.
+     *
+     * @param dataType Field's data-type
+     */
+    public void setDataType(FieldType dataType) {
+        this.dataType = dataType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Field field = (Field) o;
+        return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, dataType);
+    }
+
+    @Override
+    public String toString() {
+        return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
+                .add("name='" + name + "'")
+                .add("dataType=" + dataType)
+                .toString();
+    }
+
+    /**
+     * Elasticsearch supported data-type enumeration. For a more detailed list of data
+     * types supported by Elasticsearch refer to
+     * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
+     */
+    public enum DataType {
+        BINARY,
+        BOOLEAN,
+        KEYWORD,
+        CONSTANT_KEYWORD,
+        WILDCARD,
+        LONG,
+        INTEGER,
+        SHORT,
+        BYTE,
+        DOUBLE,
+        FLOAT,
+        HALF_FLOAT,
+        SCALED_FLOAT,
+        OBJECT,
+        FLATTENED,
+        NESTED,
+        TEXT,
+        COMPLETION,
+        SEARCH_AS_YOU_TYPE,
+        TOKEN_COUNT
+    }
+
+    public static class FieldType {
+
+        private DataType type;
+
+        // Parameter for scaled_float type.
+        private int scalingFactor;
+
+        /**
+         * Constructor for FieldType.
+         *
+         * @param type Elasticsearch data type
+         */
+        public FieldType(DataType type) {
+            this.type = type;
+        }
+
+        /**
+         * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
+         * with scaling factor parameter.
+         *
+         * @param scalingFactor scaled_float field's scaling factor
+         */
+        public FieldType(int scalingFactor) {
+            this.type = DataType.SCALED_FLOAT;
+            this.scalingFactor = scalingFactor;
+        }
+
+        /**
+         *
+         * @return Elasticsearch data type
+         */
+        public DataType getType() {
+            return type;
+        }
+
+        /**
+         *
+         * @param type Elasticsearch data type
+         */
+        public void setType(DataType type) {
+            this.type = type;
+        }
+
+        /**
+         * Returns the scaling factor of scaled_float type.
+         *
+         * @return scaled_float field's scaling factor
+         */
+        public int getScalingFactor() {
+            return scalingFactor;
+        }
+
+        /**
+         * Sets the scaling factor of scaled_float type.
+         *
+         * @param scalingFactor scaled_float field's scaling factor
+         */
+        public void setScalingFactor(int scalingFactor) {
+            this.scalingFactor = scalingFactor;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            FieldType fieldType = (FieldType) o;
+            return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(type, scalingFactor);
+        }
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java
new file mode 100644
index 0000000..c5dccd1
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Mapping related classes.
+ */
+package org.apache.gora.elasticsearch.mapping;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java
new file mode 100644
index 0000000..8d2313a
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all Elasticsearch datastore related classes.
+ */
+package org.apache.gora.elasticsearch;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
new file mode 100644
index 0000000..f491006
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Elasticsearch specific implementation of the {@link Query} interface.
+ */
+public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
+
+    /**
+     * Constructor for the query.
+     *
+     * @param dataStore data store used
+     */
+    public ElasticsearchQuery(DataStore<K, T> dataStore) {
+        super(dataStore);
+    }
+
+    public ElasticsearchQuery() {
+        super(null);
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
new file mode 100644
index 0000000..e7f8180
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+import java.util.List;
+
+/**
+ * ElasticsearchResult specific implementation of the
+ * {@link org.apache.gora.query.Result} interface.
+ */
+public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> {
+
+    /**
+     * List of resulting persistent objects.
+     */
+    private List<T> persistentObjects;
+
+    /**
+     * List of resulting objects keys.
+     */
+    private List<K> persistentKeys;
+
+    public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
+        super(dataStore, query);
+        this.persistentKeys = persistentKeys;
+        this.persistentObjects = persistentObjects;
+    }
+
+    @Override
+    public float getProgress() {
+        if (persistentObjects.size() == 0) {
+            return 1;
+        }
+
+        return offset / (float) persistentObjects.size();
+    }
+
+    @Override
+    public int size() {
+        return persistentObjects.size();
+    }
+
+    @Override
+    protected boolean nextInner() {
+        if ((int) offset == persistentObjects.size()) {
+            return false;
+        }
+
+        persistent = persistentObjects.get((int) offset);
+        key = persistentKeys.get((int) offset);
+        return persistent != null;
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java
new file mode 100644
index 0000000..ad7b9d7
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Query related classes.
+ */
+package org.apache.gora.elasticsearch.query;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
new file mode 100644
index 0000000..a82de7f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMappingBuilder;
+import org.apache.gora.elasticsearch.mapping.Field;
+import org.apache.gora.elasticsearch.query.ElasticsearchQuery;
+import org.apache.gora.elasticsearch.query.ElasticsearchResult;
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHeader;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+/**
+ * Implementation of a Apache Elasticsearch data store to be used by Apache Gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
+
+    public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
+    private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
+    public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
+    private static final String XML_MAPPING_DEFINITION = "gora.mapping";
+    public static final String XSD_VALIDATION = "gora.xsd_validation";
+
+    /**
+     * Elasticsearch client
+     */
+    private RestHighLevelClient client;
+
+    /**
+     * Mapping definition for Elasticsearch
+     */
+    private ElasticsearchMapping elasticsearchMapping;
+
+    @Override
+    public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
+        try {
+            LOG.debug("Initializing Elasticsearch store");
+            ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+            super.initialize(keyClass, persistentClass, properties);
+            ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
+            InputStream mappingStream;
+            if (properties.containsKey(XML_MAPPING_DEFINITION)) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
+                }
+                mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
+            } else {
+                mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
+            }
+            String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
+            builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
+            elasticsearchMapping = builder.getElasticsearchMapping();
+            client = createClient(parameters);
+            LOG.info("Elasticsearch store was successfully initialized.");
+        } catch (Exception ex) {
+            LOG.error("Error while initializing Elasticsearch store", ex);
+            throw new GoraException(ex);
+        }
+    }
+
+    public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
+        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
+
+        // Choosing the authentication method.
+        switch (parameters.getAuthenticationType()) {
+            case BASIC:
+                if (parameters.getUsername() != null && parameters.getPassword() != null) {
+                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+                    credentialsProvider.setCredentials(AuthScope.ANY,
+                            new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
+                    clientBuilder.setHttpClientConfigCallback(
+                            httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+                } else {
+                    throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
+                }
+                break;
+            case TOKEN:
+                if (parameters.getAuthorizationToken() != null) {
+                    Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
+                            parameters.getAuthorizationToken())};
+                    clientBuilder.setDefaultHeaders(defaultHeaders);
+                } else {
+                    throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
+                }
+                break;
+            case APIKEY:
+                if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
+                    String apiKeyAuth = Base64.getEncoder()
+                            .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
+                                    .getBytes(StandardCharsets.UTF_8));
+                    Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
+                    clientBuilder.setDefaultHeaders(defaultHeaders);
+                } else {
+                    throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
+                }
+                break;
+        }
+
+        if (parameters.getConnectTimeout() != 0) {
+            clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+                    requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
+        }
+
+        if (parameters.getSocketTimeout() != 0) {
+            clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+                    requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
+        }
+
+        if (parameters.getIoThreadCount() != 0) {
+            clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
+                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
+                            .setIoThreadCount(parameters.getIoThreadCount()).build()));
+        }
+        return new RestHighLevelClient(clientBuilder);
+    }
+
+    public ElasticsearchMapping getMapping() {
+        return elasticsearchMapping;
+    }
+
+    @Override
+    public String getSchemaName() {
+        return elasticsearchMapping.getIndexName();
+    }
+
+    @Override
+    public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
+        return super.getSchemaName(mappingSchemaName, persistentClass);
+    }
+
+    @Override
+    public void createSchema() throws GoraException {
+        CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
+        Map<String, Object> properties = new HashMap<>();
+        for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
+            Map<String, Object> fieldType = new HashMap<>();
+            fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
+            if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
+                fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
+            }
+            properties.put(entry.getKey(), fieldType);
+        }
+        // Special field for range query
+        properties.put("gora_id", new HashMap<String, Object>() {{
+            put("type", "keyword");
+        }});
+        Map<String, Object> mapping = new HashMap<>();
+        mapping.put("properties", properties);
+        request.mapping(mapping);
+        try {
+            if (!client.indices().exists(
+                    new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+                client.indices().create(request, RequestOptions.DEFAULT);
+            }
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public void deleteSchema() throws GoraException {
+        DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
+        try {
+            if (client.indices().exists(
+                    new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+                client.indices().delete(request, RequestOptions.DEFAULT);
+            }
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public boolean schemaExists() throws GoraException {
+        try {
+            return client.indices().exists(
+                    new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public boolean exists(K key) throws GoraException {
+        GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+        getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
+        try {
+            return client.exists(getRequest, RequestOptions.DEFAULT);
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public T get(K key, String[] fields) throws GoraException {
+        String[] requestedFields = getFieldsToQuery(fields);
+        List<String> documentFields = new ArrayList<>();
+        for (String requestedField : requestedFields) {
+            documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
+        }
+        try {
+            // Prepare the Elasticsearch request
+            GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+            if (getResponse.isExists()) {
+                Map<String, Object> sourceMap = getResponse.getSourceAsMap();
+
+                // Map of field's name and its value from the Document
+                Map<String, Object> fieldsAndValues = new HashMap<>();
+                for (String field : documentFields) {
+                    fieldsAndValues.put(field, sourceMap.get(field));
+                }
+
+                // Build the corresponding persistent
+                return newInstance(fieldsAndValues, requestedFields);
+            } else {
+                return null;
+            }
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public void put(K key, T obj) throws GoraException {
+        if (obj.isDirty()) {
+            Schema schemaObj = obj.getSchema();
+            List<Schema.Field> fields = schemaObj.getFields();
+            Map<String, Object> jsonMap = new HashMap<>();
+            for (Schema.Field field : fields) {
+                Field mappedField = elasticsearchMapping.getFields().get(field.name());
+                if (mappedField != null) {
+                    Object fieldValue = obj.get(field.pos());
+                    if (fieldValue != null) {
+                        Schema fieldSchema = field.schema();
+                        Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
+                        jsonMap.put(mappedField.getName(), serializedObj);
+                    }
+                }
+            }
+            // Special field for range query
+            jsonMap.put("gora_id", key);
+            // Prepare the Elasticsearch request
+            IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
+            try {
+                client.index(request, RequestOptions.DEFAULT);
+            } catch (IOException ex) {
+                throw new GoraException(ex);
+            }
+        } else {
+            LOG.info("Ignored putting object {} in the store as it is neither "
+                    + "new, neither dirty.", new Object[]{obj});
+        }
+    }
+
+    @Override
+    public boolean delete(K key) throws GoraException {
+        DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
+        try {
+            DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
+            return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public long deleteByQuery(Query<K, T> query) throws GoraException {
+        try {
+            BulkByScrollResponse bulkResponse;
+            if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
+                UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
+                QueryBuilder matchDocumentsWithinRange = QueryBuilders
+                        .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+                updateRequest.setQuery(matchDocumentsWithinRange);
+
+                // Create a script for deleting fields
+                StringBuilder toDelete = new StringBuilder();
+                String[] fieldsToDelete = query.getFields();
+                for (String field : fieldsToDelete) {
+                    String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
+                    toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField));
+                }
+                //toDelete.deleteCharAt(toDelete.length() - 1);
+                updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
+                bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
+                return bulkResponse.getUpdated();
+            } else {
+                DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
+                QueryBuilder matchDocumentsWithinRange = QueryBuilders
+                        .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+                deleteRequest.setQuery(matchDocumentsWithinRange);
+                bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
+                return bulkResponse.getDeleted();
+            }
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public Result<K, T> execute(Query<K, T> query) throws GoraException {
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+        // Set the query result limit
+        int size = (int) query.getLimit();
+        if (size != -1) {
+            searchSourceBuilder.size(size);
+        }
+        try {
+            // Build the actual Elasticsearch range query
+            QueryBuilder rangeQueryBuilder = QueryBuilders
+                    .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
+            searchSourceBuilder.query(rangeQueryBuilder);
+            SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
+            searchRequest.source(searchSourceBuilder);
+            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+
+            String[] avroFields = getFieldsToQuery(query.getFields());
+
+            SearchHits hits = searchResponse.getHits();
+            SearchHit[] searchHits = hits.getHits();
+            List<K> hitId = new ArrayList<>();
+
+            // Check filter
+            Filter<K, T> queryFilter = query.getFilter();
+            List<T> filteredObjects = new ArrayList<>();
+            for (SearchHit hit : searchHits) {
+                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
+                if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
+                    filteredObjects.add(newInstance(sourceAsMap, avroFields));
+                    hitId.add((K) hit.getId());
+                }
+            }
+            return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public Query<K, T> newQuery() {
+        ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
+        query.setFields(getFieldsToQuery(null));
+        return query;
+    }
+
+    @Override
+    public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
+        List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+        PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+                query);
+        partitionQuery.setConf(getConf());
+        partitions.add(partitionQuery);
+        return partitions;
+    }
+
+    @Override
+    public void flush() throws GoraException {
+        try {
+            client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+            client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            client.close();
+            LOG.info("Elasticsearch datastore destroyed successfully.");
+        } catch (IOException ex) {
+            LOG.error(ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * Build a new instance of the persisted class from the Document retrieved from the database.
+     *
+     * @param fieldsAndValues Map of field's name and its value from the Document
+     *                        that results from the query to the database
+     * @param requestedFields the list of fields to be mapped to the persistence class instance
+     * @return a persistence class instance which content was deserialized from the Document
+     * @throws IOException
+     */
+    public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
+        // Create new empty persistent bean instance
+        T persistent = newPersistent();
+
+        requestedFields = getFieldsToQuery(requestedFields);
+        // Populate each field
+        for (String objField : requestedFields) {
+            Schema.Field field = fieldMap.get(objField);
+            Schema fieldSchema = field.schema();
+            String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
+            Object fieldValue = fieldsAndValues.get(docFieldName);
+
+            Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
+            persistent.put(field.pos(), result);
+        }
+        persistent.clearDirty();
+        return persistent;
+    }
+
+    /**
+     * Deserialize an Elasticsearch object to a persistent Avro object.
+     *
+     * @param avroField          persistent Avro class field to which the value will be deserialized
+     * @param avroFieldSchema    schema for the persistent Avro class field
+     * @param elasticsearchValue Elasticsearch field value to be deserialized
+     * @return deserialized Avro object from the Elasticsearch object
+     * @throws GoraException when the given Elasticsearch value cannot be deserialized
+     */
+    private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
+                                         Object elasticsearchValue) throws GoraException {
+        Object fieldValue;
+        switch (avroFieldSchema.getType()) {
+            case MAP:
+                fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
+                break;
+            case RECORD:
+                fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
+                break;
+            case ARRAY:
+                fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
+                break;
+            case BOOLEAN:
+                fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
+                break;
+            case BYTES:
+                fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
+                break;
+            case FIXED:
+            case NULL:
+                fieldValue = null;
+                break;
+            case UNION:
+                fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
+                break;
+            case DOUBLE:
+                fieldValue = Double.parseDouble(elasticsearchValue.toString());
+                break;
+            case ENUM:
+                fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
+                break;
+            case FLOAT:
+                fieldValue = Float.parseFloat(elasticsearchValue.toString());
+                break;
+            case INT:
+                fieldValue = Integer.parseInt(elasticsearchValue.toString());
+                break;
+            case LONG:
+                fieldValue = Long.parseLong(elasticsearchValue.toString());
+                break;
+            case STRING:
+                fieldValue = new Utf8(elasticsearchValue.toString());
+                break;
+            default:
+                fieldValue = elasticsearchValue;
+        }
+        return fieldValue;
+    }
+
+    /**
+     * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
+     * that can safely be written into Avro persistent object.
+     *
+     * @param avroField          persistent Avro class field to which the value will be deserialized
+     * @param avroFieldSchema    schema for the persistent Avro class field
+     * @param elasticsearchValue Elasticsearch field value to be deserialized
+     * @return deserialized Avro List from the given Elasticsearch value
+     * @throws GoraException when one of the underlying values cannot be deserialized
+     */
+    private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
+                                         Object elasticsearchValue) throws GoraException {
+        List<Object> list = new ArrayList<>();
+        if (elasticsearchValue != null) {
+            for (Object item : (List<Object>) elasticsearchValue) {
+                Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
+                list.add(result);
+            }
+        }
+        return new DirtyListWrapper<>(list);
+    }
+
+    /**
+     * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
+     * that can safely be written into Avro persistent object.
+     *
+     * @param avroField        persistent Avro class field to which the value will be deserialized
+     * @param avroFieldSchema  schema for the persistent Avro class field
+     * @param elasticsearchMap Elasticsearch Map value to be deserialized
+     * @return deserialized Avro Map from the given Elasticsearch Map value
+     * @throws GoraException when one of the underlying values cannot be deserialized
+     */
+    private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
+                                        Map<String, Object> elasticsearchMap) throws GoraException {
+        Map<Utf8, Object> deserializedMap = new HashMap<>();
+        if (elasticsearchMap != null) {
+            for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
+                String mapKey = entry.getKey();
+                Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
+                deserializedMap.put(new Utf8(mapKey), mapValue);
+            }
+        }
+        return new DirtyMapWrapper<>(deserializedMap);
+    }
+
+    /**
+     * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
+     * that can safely be written into Avro persistent object.
+     *
+     * @param avroFieldSchema     schema for the persistent Avro class field
+     * @param elasticsearchRecord Elasticsearch Record value to be deserialized
+     * @return deserialized Avro Object from the given Elasticsearch Record value
+     * @throws GoraException when one of the underlying values cannot be deserialized
+     */
+    private Object fromElasticsearchRecord(Schema avroFieldSchema,
+                                           Map<String, Object> elasticsearchRecord) throws GoraException {
+        Class<?> clazz;
+        try {
+            clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
+        } catch (ClassNotFoundException ex) {
+            throw new GoraException(ex);
+        }
+        PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
+        for (Schema.Field recField : avroFieldSchema.getFields()) {
+            Schema innerSchema = recField.schema();
+            Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
+            record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
+        }
+        return record;
+    }
+
+    /**
+     * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
+     * that can safely be written into Avro persistent object.
+     *
+     * @param avroField          persistent Avro class field to which the value will be deserialized
+     * @param avroFieldSchema    schema for the persistent Avro class field
+     * @param elasticsearchUnion Elasticsearch Union value to be deserialized
+     * @return deserialized Avro Object from the given Elasticsearch Union value
+     * @throws GoraException when one of the underlying values cannot be deserialized
+     */
+    private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
+        Object deserializedUnion;
+        Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+        Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+        if (avroFieldSchema.getTypes().size() == 2 &&
+                (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+                !type0.equals(type1)) {
+            int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+            Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+            deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
+        } else if (avroFieldSchema.getTypes().size() == 3) {
+            Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+            if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+                    (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+                if (elasticsearchUnion == null) {
+                    deserializedUnion = null;
+                } else if (elasticsearchUnion instanceof String) {
+                    throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
+                } else {
+                    int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+                    Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+                    deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
+                }
+            } else {
+                throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+            }
+        } else {
+            throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+        }
+        return deserializedUnion;
+    }
+
+    /**
+     * Serialize a persistent Avro object as used in Gora generated classes to
+     * an object that can be written into Elasticsearch.
+     *
+     * @param avroFieldSchema schema for the persistent Avro class field
+     * @param avroFieldValue  persistent Avro field value to be serialized
+     * @return serialized field value
+     * @throws GoraException when the given Avro object cannot be serialized
+     */
+    private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
+        Object output = avroFieldValue;
+        switch (avroFieldSchema.getType()) {
+            case ARRAY:
+                output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
+                break;
+            case MAP:
+                output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
+                break;
+            case RECORD:
+                output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
+                break;
+            case BYTES:
+                output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
+                break;
+            case UNION:
+                output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
+                break;
+            case BOOLEAN:
+            case DOUBLE:
+            case ENUM:
+            case FLOAT:
+            case INT:
+            case LONG:
+            case STRING:
+                output = avroFieldValue.toString();
+                break;
+            case FIXED:
+                break;
+            case NULL:
+                output = null;
+                break;
+        }
+        return output;
+    }
+
+    /**
+     * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
+     * List that can safely be written into Elasticsearch.
+     *
+     * @param collection      the collection to be serialized
+     * @param avroFieldSchema field schema for the underlying type
+     * @return a List version of the collection that can be safely written into Elasticsearch
+     * @throws GoraException when one of the underlying values cannot be serialized
+     */
+    private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
+        List<Object> list = new ArrayList<>();
+        for (Object item : collection) {
+            Object result = serializeFieldValue(avroFieldSchema, item);
+            list.add(result);
+        }
+        return list;
+    }
+
+    /**
+     * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
+     * map that can safely be written into Elasticsearch.
+     *
+     * @param map             the map to be serialized
+     * @param avroFieldSchema field schema for the underlying type
+     * @return a Map version of the Java map that can be safely written into Elasticsearch
+     * @throws GoraException when one of the underlying values cannot be serialized
+     */
+    private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
+        Map<CharSequence, Object> serializedMap = new HashMap<>();
+        for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
+            String mapKey = entry.getKey().toString();
+            Object mapValue = entry.getValue();
+            Object result = serializeFieldValue(avroFieldSchema, mapValue);
+            serializedMap.put(mapKey, result);
+        }
+        return serializedMap;
+    }
+
+    /**
+     * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+     * record that can safely be written into Elasticsearch.
+     *
+     * @param record          the object to be serialized
+     * @param avroFieldSchema field schema for the underlying type
+     * @return a record version of the Java object that can be safely written into Elasticsearch
+     * @throws GoraException when one of the underlying values cannot be serialized
+     */
+    private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
+        Map<CharSequence, Object> serializedRecord = new HashMap<>();
+        for (Schema.Field member : avroFieldSchema.getFields()) {
+            Object innerValue = ((PersistentBase) record).get(member.pos());
+            serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
+        }
+        return serializedRecord;
+    }
+
+    /**
+     * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+     * object that can safely be written into Elasticsearch.
+     *
+     * @param union           the object to be serialized
+     * @param avroFieldSchema field schema for the underlying type
+     * @return a object version of the Java object that can be safely written into Elasticsearch
+     * @throws GoraException when one of the underlying values cannot be serialized
+     */
+    private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
+        Object serializedUnion;
+        Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+        Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+        if (avroFieldSchema.getTypes().size() == 2 &&
+                (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+                !type0.equals(type1)) {
+            int schemaPos = getUnionSchema(union, avroFieldSchema);
+            Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+            serializedUnion = serializeFieldValue(unionSchema, union);
+        } else if (avroFieldSchema.getTypes().size() == 3) {
+            Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+            if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+                    (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+                if (union == null) {
+                    serializedUnion = null;
+                } else if (union instanceof String) {
+                    throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
+                } else {
+                    int schemaPos = getUnionSchema(union, avroFieldSchema);
+                    Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+                    serializedUnion = recordToElasticsearch(union, unionSchema);
+                }
+            } else {
+                throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+            }
+        } else {
+            throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+        }
+        return serializedUnion;
+    }
+
+    /**
+     * Method to retrieve the corresponding schema type index of a particular
+     * object having UNION schema. As UNION type can have one or more types and at
+     * a given instance, it holds an object of only one type of the defined types,
+     * this method is used to figure out the corresponding instance's schema type
+     * index.
+     *
+     * @param instanceValue value that the object holds
+     * @param unionSchema   union schema containing all of the data types
+     * @return the unionSchemaPosition corresponding schema position
+     */
+    private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+        int unionSchemaPos = 0;
+        for (Schema currentSchema : unionSchema.getTypes()) {
+            Schema.Type schemaType = currentSchema.getType();
+            if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
+                return unionSchemaPos;
+            }
+            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
+                return unionSchemaPos;
+            }
+            unionSchemaPos++;
+        }
+        return 0;
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
new file mode 100644
index 0000000..01466ee
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class with the collection info returned by ElasticsearchStoreMetadataAnalyzer with the information
+ * about an ElasticsearchStore collection.
+ */
+public class ElasticsearchStoreCollectionMetadata {
+
+    /**
+     * Collection document keys present in a given collection at ElasticsearchStore.
+     */
+    private List<String> documentKeys = new ArrayList<>();
+
+    /**
+     * Collection document types present in a given collection at ElasticsearchStore.
+     */
+    private List<String> documentTypes = new ArrayList<>();
+
+    public List<String> getDocumentKeys() {
+        return documentKeys;
+    }
+
+    public void setDocumentKeys(List<String> documentKeys) {
+        this.documentKeys = documentKeys;
+    }
+
+    public List<String> getDocumentTypes() {
+        return documentTypes;
+    }
+
+    public void setDocumentTypes(List<String> documentTypes) {
+        this.documentTypes = documentTypes;
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
new file mode 100644
index 0000000..6806aa3
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.store.impl.DataStoreMetadataAnalyzer;
+import org.apache.gora.util.GoraException;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetIndexResponse;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
+
+    private RestHighLevelClient elasticsearchClient;
+
+    @Override
+    public void initialize() throws GoraException {
+        ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+        elasticsearchClient = ElasticsearchStore.createClient(parameters);
+    }
+
+    @Override
+    public String getType() {
+        return "ELASTICSEARCH";
+    }
+
+    @Override
+    public List<String> getTablesNames() throws GoraException {
+        GetIndexRequest request = new GetIndexRequest("*");
+        GetIndexResponse response;
+        try {
+            response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+        if (response == null) {
+            LOG.error("Could not find indices.");
+            throw new GoraException("Could not find indices.");
+        }
+        return Arrays.asList(response.getIndices());
+    }
+
+    @Override
+    public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
+        GetIndexRequest request = new GetIndexRequest(tableName);
+        GetIndexResponse getIndexResponse;
+        try {
+            getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+        } catch (IOException ex) {
+            throw new GoraException(ex);
+        }
+        MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
+        Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
+
+        List<String> documentTypes = new ArrayList<>();
+        List<String> documentKeys = new ArrayList<>();
+
+        for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
+            Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
+            documentTypes.add((String) subEntry.get("type"));
+            documentKeys.add(entry.getKey());
+        }
+
+        ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
+        collectionMetadata.setDocumentKeys(documentKeys);
+        collectionMetadata.setDocumentTypes(documentTypes);
+        return collectionMetadata;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (elasticsearchClient != null) {
+            this.elasticsearchClient.close();
+        }
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java
new file mode 100644
index 0000000..66f575f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains the core classes of the Elasticsearch datastore.
+ */
+package org.apache.gora.elasticsearch.store;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
new file mode 100644
index 0000000..dbbd45b
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+/**
+ * Authentication type to connect to the Elasticsearch server.
+ */
+public enum AuthenticationType {
+    /**
+     * Basic authentication requires to provide a username and password.
+     */
+    BASIC,
+    /**
+     * Token authentication requires to provide an Elasticsearch access token.
+     */
+    TOKEN,
+    /**
+     * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
+     */
+    APIKEY
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
new file mode 100644
index 0000000..fa9bdc2
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+/**
+ * Constants file for Elasticsearch.
+ */
+public class ElasticsearchConstants {
+    /**
+     * Property indicating if the hadoop configuration has priority or not.
+     */
+    public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
+
+    /**
+     * Default configurations for Elasticsearch.
+     */
+    public static final String DEFAULT_HOST = "localhost";
+    public static final int DEFAULT_PORT = 9200;
+
+    /**
+     * List of keys used in the configuration file of Elasticsearch.
+     */
+    public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
+    public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
+    public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
+    public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
+    public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
+    public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
+    public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
+    public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
+    public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
+    public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
+    public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
+    public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
new file mode 100644
index 0000000..4b0cef4
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+/**
+ * Parameters definitions for Elasticsearch.
+ */
+public class ElasticsearchParameters {
+
+    /**
+     * Elasticsearch server host.
+     */
+    private String host;
+
+    /**
+     * Elasticsearch server port.
+     */
+    private int port;
+
+    /**
+     * Elasticsearch server scheme.
+     * Optional. If not provided, defaults to http.
+     */
+    private String scheme;
+
+    /**
+     * Authentication type to connect to the server.
+     * Can be BASIC, TOKEN or APIKEY.
+     */
+    private AuthenticationType authenticationType;
+
+    /**
+     * Username to use for server authentication.
+     * Required for BASIC authentication to connect to the server.
+     */
+    private String username;
+
+    /**
+     * Password to use for server authentication.
+     * Required for BASIC authentication to connect to the server.
+     */
+    private String password;
+
+    /**
+     * Authorization token to use for server authentication.
+     * Required for TOKEN authentication to connect to the server.
+     */
+    private String authorizationToken;
+
+    /**
+     * API Key ID to use for server authentication.
+     * Required for APIKEY authentication to connect to the server.
+     */
+    private String apiKeyId;
+
+    /**
+     * API Key Secret to use for server authentication.
+     * Required for APIKEY authentication to connect to the server.
+     */
+    private String apiKeySecret;
+
+    /**
+     * Timeout in milliseconds used for establishing the connection to the server.
+     * Optional. If not provided, defaults to 5000s.
+     *
+     */
+    private int connectTimeout;
+
+    /**
+     * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
+     * Optional. If not provided, defaults to 60000s.
+     */
+    private int socketTimeout;
+
+    /**
+     * Number of worker threads used by the connection manager.
+     * Optional. If not provided, defaults to 1.
+     */
+    private int ioThreadCount;
+
+    public ElasticsearchParameters(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    public String getHost() {
+        return host;
+    }
+
+    public void setHost(String host) {
+        this.host = host;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    public String getScheme() {
+        return scheme;
+    }
+
+    public void setScheme(String scheme) {
+        this.scheme = scheme;
+    }
+
+    public AuthenticationType getAuthenticationType() {
+        return authenticationType;
+    }
+
+    public void setAuthenticationType(AuthenticationType authenticationType) {
+        this.authenticationType = authenticationType;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    public String getAuthorizationToken() {
+        return authorizationToken;
+    }
+
+    public void setAuthorizationToken(String authorizationToken) {
+        this.authorizationToken = authorizationToken;
+    }
+
+    public String getApiKeyId() {
+        return apiKeyId;
+    }
+
+    public void setApiKeyId(String apiKeyId) {
+        this.apiKeyId = apiKeyId;
+    }
+
+    public String getApiKeySecret() {
+        return apiKeySecret;
+    }
+
+    public void setApiKeySecret(String apiKeySecret) {
+        this.apiKeySecret = apiKeySecret;
+    }
+
+    public int getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(int connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+
+    public int getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    public void setSocketTimeout(int socketTimeout) {
+        this.socketTimeout = socketTimeout;
+    }
+
+    public int getIoThreadCount() {
+        return ioThreadCount;
+    }
+
+    public void setIoThreadCount(int ioThreadCount) {
+        this.ioThreadCount = ioThreadCount;
+    }
+
+    /**
+     * Reads Elasticsearch parameters from a properties list.
+     *
+     * @param properties Properties list
+     * @return Elasticsearch parameters instance
+     */
+    public static ElasticsearchParameters load(Properties properties, Configuration conf) {
+        ElasticsearchParameters elasticsearchParameters;
+
+        if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
+            elasticsearchParameters = new ElasticsearchParameters(
+                    conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+                    conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
+        } else {
+            elasticsearchParameters = new ElasticsearchParameters(
+                    properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+                    Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
+                            String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
+        }
+
+        String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
+        if (schemeProperty != null) {
+            elasticsearchParameters.setScheme(schemeProperty);
+        }
+
+        AuthenticationType authenticationTypeProperty =
+                AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
+        if (authenticationTypeProperty != null) {
+            elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
+        }
+
+        String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
+        if (usernameProperty != null) {
+            elasticsearchParameters.setUsername(usernameProperty);
+        }
+
+        String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
+        if (passwordProperty != null) {
+            elasticsearchParameters.setPassword(passwordProperty);
+        }
+
+        String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
+        if (authorizationTokenProperty != null) {
+            elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
+        }
+
+        String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
+        if (apiKeyIdProperty != null) {
+            elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
+        }
+
+        String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
+        if (apiKeySecretProperty != null) {
+            elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
+        }
+
+        String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
+        if (connectTimeoutProperty != null) {
+            elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
+        }
+
+        String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
+        if (socketTimeoutProperty != null) {
+            elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
+        }
+
+        String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
+        if (ioThreadCountProperty != null) {
+            elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
+        }
+
+        return elasticsearchParameters;
+    }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java
new file mode 100644
index 0000000..49fcb93
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains utility classes.
+ */
+package org.apache.gora.elasticsearch.utils;
diff --git a/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd
new file mode 100644
index 0000000..0c7c011
--- /dev/null
+++ b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+    <xs:element name="gora-otd" type="elasticsearch-mapping"/>
+
+    <xs:complexType name="elasticsearch-mapping">
+        <xs:sequence>
+            <xs:element name="class" type="class-mapping" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="class-mapping">
+        <xs:sequence>
+            <xs:element name="field" type="field-mapping" maxOccurs="unbounded"/>
+        </xs:sequence>
+        <xs:attribute name="name" type="nameClass-types" use="required"/>
+        <xs:attribute name="keyClass" type="keyClass-types" use="required"/>
+        <xs:attribute name="index" type="xs:string" use="required"/>
+    </xs:complexType>
+
+    <xs:complexType name="field-mapping">
+        <xs:attribute name="name" type="fieldName-types" use="required"/>
+        <xs:attribute name="docfield" type="xs:string" use="required"/>
+        <xs:attribute name="type" type="xs:string" use="required"/>
+        <xs:attribute name="scalingFactor" type="xs:string" use="optional"/>
+    </xs:complexType>
+
+    <xs:simpleType name="keyClass-types">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="java.lang.String"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="nameClass-types">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="([\p{L}_$][\p{L}\p{N}_$]*\.)*[\p{L}_$][\p{L}\p{N}_$]*"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="fieldName-types">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="[a-zA-Z][a-zA-Z0-9]*"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+</xs:schema>
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
new file mode 100644
index 0000000..ef17dbc
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch;
+
+import org.apache.gora.GoraTestDriver;
+import org.apache.gora.elasticsearch.store.ElasticsearchStore;
+import org.apache.gora.elasticsearch.utils.ElasticsearchConstants;
+import org.apache.gora.store.DataStoreFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.util.Properties;
+
+/**
+ * Helper class for third part tests using gora-elasticsearch backend.
+ *
+ * @see GoraTestDriver
+ */
+public class GoraElasticsearchTestDriver extends GoraTestDriver {
+
+    private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
+    private ElasticsearchContainer elasticsearchContainer;
+
+    /**
+     * Constructor for this class.
+     */
+    public GoraElasticsearchTestDriver() {
+        super(ElasticsearchStore.class);
+        Properties properties = DataStoreFactory.createProps();
+        elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
+                .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
+                .withEnv("xpack.security.enabled", "true");
+    }
+
+    /**
+     * Initiate the Elasticsearch server on the default port.
+     */
+    @Override
+    public void setUpClass() throws Exception {
+        elasticsearchContainer.start();
+        log.info("Setting up Elasticsearch test driver");
+
+        int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
+        String host = elasticsearchContainer.getContainerIpAddress();
+        conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
+        conf.set(ElasticsearchConstants.PROP_HOST, host);
+    }
+
+    /**
+     * Tear the server down.
+     */
+    @Override
+    public void tearDownClass() throws Exception {
+        elasticsearchContainer.close();
+        log.info("Tearing down Elasticsearch test driver");
+    }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
new file mode 100644
index 0000000..d5aa9e6
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapreduce;
+
+import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+
+/**
+ * Executes tests for MR jobs over Elasticsearch dataStore.
+ */
+public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase {
+
+    private GoraElasticsearchTestDriver driver;
+
+    public ElasticsearchStoreMapReduceTest() throws IOException {
+        super();
+        driver = new GoraElasticsearchTestDriver();
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        driver.setUpClass();
+        super.setUp();
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        driver.tearDownClass();
+    }
+
+    @Override
+    protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
+        return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
+    }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java
new file mode 100644
index 0000000..781ce61
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains map reduce tests.
+ */
+package org.apache.gora.elasticsearch.mapreduce;
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java
new file mode 100644
index 0000000..406f829
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Elasticsearch datastore test utilities.
+ */
+package org.apache.gora.elasticsearch;
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
new file mode 100644
index 0000000..7c95593
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
+import org.apache.gora.elasticsearch.mapping.Field;
+import org.apache.gora.elasticsearch.utils.AuthenticationType;
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.examples.generated.EmployeeInt;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.DataStoreMetadataFactory;
+import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.impl.DataStoreMetadataAnalyzer;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Test case for ElasticsearchStore.
+ */
+public class TestElasticsearchStore extends DataStoreTestBase {
+
+    static {
+        setTestDriver(new GoraElasticsearchTestDriver());
+    }
+
+    @Test
+    public void testInitialize() throws GoraException {
+        log.info("test method: testInitialize");
+
+        ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
+
+        Map<String, Field> fields = new HashMap<String, Field>() {{
+            put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
+            put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
+            put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
+            put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
+            put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
+            put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
+            put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
+        }};
+
+        Assert.assertEquals("frontier", employeeStore.getSchemaName());
+        Assert.assertEquals("frontier", mapping.getIndexName());
+        Assert.assertEquals(fields, mapping.getFields());
+    }
+
+    @Test
+    public void testLoadElasticsearchParameters() throws IOException {
+        log.info("test method: testLoadElasticsearchParameters");
+
+        Properties properties = DataStoreFactory.createProps();
+
+        ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
+
+        Assert.assertEquals("localhost", parameters.getHost());
+        Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
+        Assert.assertEquals("elastic", parameters.getUsername());
+        Assert.assertEquals("password", parameters.getPassword());
+    }
+
+    @Test(expected = GoraException.class)
+    public void testInvalidXmlFile() throws Exception {
+        log.info("test method: testInvalidXmlFile");
+
+        Properties properties = DataStoreFactory.createProps();
+        properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+        properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
+        testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+    }
+
+    @Test
+    public void testXsdValidationParameter() throws GoraException {
+        log.info("test method: testXsdValidationParameter");
+
+        Properties properties = DataStoreFactory.createProps();
+        properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+        properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
+        testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+    }
+
+    @Test
+    public void testGetType() throws GoraException, ClassNotFoundException {
+        Configuration conf = testDriver.getConfiguration();
+        DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+        String actualType = storeMetadataAnalyzer.getType();
+        String expectedType = "ELASTICSEARCH";
+        Assert.assertEquals(expectedType, actualType);
+    }
+
+    @Test
+    public void testGetTablesNames() throws GoraException, ClassNotFoundException {
+        Configuration conf = testDriver.getConfiguration();
+        DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+        List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
+        List<String> expectedTablesNames = new ArrayList<String>() {
+            {
+                add("frontier");
+                add("webpage");
+            }
+        };
+        Assert.assertEquals(expectedTablesNames, actualTablesNames);
+    }
+
+    @Test
+    public void testGetTableInfo() throws GoraException, ClassNotFoundException {
+        Configuration conf = testDriver.getConfiguration();
+        DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+        ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
+                (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
+
+        List<String> expectedDocumentKeys = new ArrayList<String>() {
+            {
+                add("name");
+                add("dateOfBirth");
+                add("ssn");
+                add("value");
+                add("salary");
+                add("boss");
+                add("webpage");
+                add("gora_id");
+            }
+        };
+
+        List<String> expectedDocumentTypes = new ArrayList<String>() {
+            {
+                add("text");
+                add("long");
+                add("text");
+                add("text");
+                add("integer");
+                add("object");
+                add("object");
+                add("keyword");
+            }
+        };
+
+        Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
+        Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
+
+        Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
+        Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
+    }
+
+    @Ignore("Elasticsearch doesn't support 3 types union field yet")
+    @Override
+    public void testGet3UnionField() {
+    }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java
new file mode 100644
index 0000000..cd77199
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Elasticsearch datastore tests.
+ */
+package org.apache.gora.elasticsearch.store;
diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml
new file mode 100644
index 0000000..c1a8ccd
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd">
+
+    <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier">
+        <!--
+            Remove type to test XSD validation.
+            <field name="name" docfield="name" type="text"/>
+        -->
+        <field name="name" docfield="name"/>
+    </class>
+
+</gora-otd>
diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml
new file mode 100644
index 0000000..13a8d08
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd">
+
+    <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier">
+        <field name="name" docfield="name" type="text"/>
+        <field name="dateOfBirth" docfield="dateOfBirth" type="long"/>
+        <field name="ssn" docfield="ssn" type="text"/>
+        <field name="value" docfield="value" type="text"/>
+        <field name="salary" docfield="salary" type="integer"/>
+        <field name="boss" docfield="boss" type="object"/>
+        <field name="webpage" docfield="webpage" type="object"/>
+    </class>
+
+    <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" index="webpage">
+        <field name="url" docfield="url" type="text"/>
+        <field name="content" docfield="content" type="binary"/>
+        <field name="parsedContent" docfield="pContent" type="text"/>
+        <field name="outlinks" docfield="links.out" type="object"/>
+        <field name="headers" docfield="headers" type="object"/>
+        <field name="metadata" docfield="metadata" type="object"/>
+        <field name="byteData" docfield="byteData" type="object"/>
+        <field name="stringData" docfield="stringData" type="object"/>
+    </class>
+
+    <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" index="TokenDatum">
+        <field name="count" docfield="count" type="integer"/>
+    </class>
+
+</gora-otd>
diff --git a/gora-elasticsearch/src/test/resources/gora.properties b/gora-elasticsearch/src/test/resources/gora.properties
new file mode 100644
index 0000000..99842a9
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+gora.datastore.default=org.apache.gora.elasticsearch.store.ElasticsearchStore
+gora.elasticsearch.override.hadoop.configuration=false
+gora.datastore.elasticsearch.host=localhost
+gora.datastore.elasticsearch.port=9200
+#gora.datastore.elasticsearch.scheme=
+
+#Basic authentication requires username and password.
+gora.datastore.elasticsearch.authenticationType=BASIC
+#Default username to connect to the Elasticsearch Docker container.
+gora.datastore.elasticsearch.username=elastic
+gora.datastore.elasticsearch.password=password
+
+#Authentication with an Elasticsearch access token.
+#gora.datastore.elasticsearch.authenticationType=TOKEN
+#gora.datastore.elasticsearch.authorizationToken=
+
+#Authentication with an Elasticsearch API Key requires API Key ID and API Key Secret.
+#gora.datastore.elasticsearch.authenticationType=APIKEY
+#gora.datastore.elasticsearch.apiKeyId=
+#gora.datastore.elasticsearch.apiKeySecret=
+
+#Additional configuration parameters for timeouts and threads.
+#gora.datastore.elasticsearch.connectTimeout=
+#gora.datastore.elasticsearch.socketTimeout=
+#gora.datastore.elasticsearch.ioThreadCount=
diff --git a/pom.xml b/pom.xml
index eeb8182..b3b27f9 100755
--- a/pom.xml
+++ b/pom.xml
@@ -816,6 +816,7 @@
     <module>gora-redis</module>
     <module>gora-jet</module>
     <module>gora-rethinkdb</module>
+    <module>gora-elasticsearch</module>
     <module>gora-tutorial</module>
     <module>gora-benchmark</module>
     <module>sources-dist</module>
@@ -855,6 +856,8 @@
     <restlet.version>2.4.0</restlet.version>
     <!-- Flink Dependencies -->
     <flink.version>1.6.2</flink.version>
+    <!-- Elasticsearch Dependencies -->
+    <elasticsearch.version>7.10.1</elasticsearch.version>
 
     <spark.version>2.2.1</spark.version>
     <aerospike.version>5.0.6</aerospike.version>
@@ -1201,6 +1204,13 @@
         <version>${redisson.version}</version>
       </dependency>
 
+      <!-- Elasticsearch dependencies -->
+      <dependency>
+        <groupId>org.elasticsearch.client</groupId>
+        <artifactId>elasticsearch-rest-high-level-client</artifactId>
+        <version>${elasticsearch.version}</version>
+      </dependency>
+
       <!--Hadoop dependencies -->
       <dependency>
         <groupId>org.apache.hadoop</groupId>