You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by dj...@apache.org on 2021/08/11 18:55:41 UTC

[gora] branch master updated: GORA-664 Fix testcases / formatting (#247)

This is an automated email from the ASF dual-hosted git repository.

djkevincr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gora.git


The following commit(s) were added to refs/heads/master by this push:
     new ffc3a13  GORA-664 Fix testcases / formatting (#247)
ffc3a13 is described below

commit ffc3a13b778dce17db823dbcda8f5ab44640ab85
Author: Kevin Ratnasekera <dj...@yahoo.com>
AuthorDate: Thu Aug 12 00:25:02 2021 +0530

    GORA-664 Fix testcases / formatting (#247)
---
 .../mapping/ElasticsearchMapping.java              |   82 +-
 .../mapping/ElasticsearchMappingBuilder.java       |  314 ++--
 .../apache/gora/elasticsearch/mapping/Field.java   |  280 ++--
 .../elasticsearch/query/ElasticsearchQuery.java    |   22 +-
 .../elasticsearch/query/ElasticsearchResult.java   |   64 +-
 .../elasticsearch/store/ElasticsearchStore.java    | 1494 ++++++++++----------
 .../ElasticsearchStoreCollectionMetadata.java      |   50 +-
 .../store/ElasticsearchStoreMetadataAnalyzer.java  |  106 +-
 .../elasticsearch/utils/AuthenticationType.java    |   24 +-
 .../utils/ElasticsearchConstants.java              |   48 +-
 .../utils/ElasticsearchParameters.java             |  464 +++---
 .../elasticsearch/GoraElasticsearchTestDriver.java |   64 +-
 .../mapreduce/ElasticsearchStoreMapReduceTest.java |   58 +-
 .../store/TestElasticsearchStore.java              |  265 ++--
 14 files changed, 1676 insertions(+), 1659 deletions(-)

diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
index d31e64f..9c6704f 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
@@ -24,50 +24,50 @@ import java.util.Map;
  */
 public class ElasticsearchMapping {
 
-    private String indexName;
-    private Map<String, Field> fields;
+  private String indexName;
+  private Map<String, Field> fields;
 
-    /**
-     * Empty constructor for the ElasticsearchMapping class.
-     */
-    public ElasticsearchMapping() {
-        fields = new HashMap<>();
-    }
+  /**
+   * Empty constructor for the ElasticsearchMapping class.
+   */
+  public ElasticsearchMapping() {
+    fields = new HashMap<>();
+  }
 
-    /**
-     * Returns the name of Elasticsearch index linked to the mapping.
-     *
-     * @return Index's name
-     */
-    public String getIndexName() {
-        return indexName;
-    }
+  /**
+   * Returns the name of Elasticsearch index linked to the mapping.
+   *
+   * @return Index's name
+   */
+  public String getIndexName() {
+    return indexName;
+  }
 
-    /**
-     * Sets the index name of the Elasticsearch mapping.
-     *
-     * @param indexName Index's name
-     */
-    public void setIndexName(String indexName) {
-        this.indexName = indexName;
-    }
+  /**
+   * Sets the index name of the Elasticsearch mapping.
+   *
+   * @param indexName Index's name
+   */
+  public void setIndexName(String indexName) {
+    this.indexName = indexName;
+  }
 
-    /**
-     * Returns a map with all mapped fields.
-     *
-     * @return Map containing mapped fields
-     */
-    public Map<String, Field> getFields() {
-        return fields;
-    }
+  /**
+   * Returns a map with all mapped fields.
+   *
+   * @return Map containing mapped fields
+   */
+  public Map<String, Field> getFields() {
+    return fields;
+  }
 
-    /**
-     * Add a new field to the mapped fields.
-     *
-     * @param classFieldName Field name in the persisted class
-     * @param field Mapped field from Elasticsearch index
-     */
-    public void addField(String classFieldName, Field field) {
-        fields.put(classFieldName, field);
-    }
+  /**
+   * Add a new field to the mapped fields.
+   *
+   * @param classFieldName Field name in the persisted class
+   * @param field          Mapped field from Elasticsearch index
+   */
+  public void addField(String classFieldName, Field field) {
+    fields.put(classFieldName, field);
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
index 30dba7b..31ad474 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
@@ -45,166 +45,166 @@ import java.util.Locale;
  */
 public class ElasticsearchMappingBuilder<K, T extends PersistentBase> {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
-
-    /**
-     * XSD validation file for the XML mapping.
-     */
-    private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
-
-    // Index description
-    static final String ATT_NAME = "name";
-
-    static final String ATT_TYPE = "type";
-
-    // Class description
-    static final String TAG_CLASS = "class";
-
-    static final String ATT_KEYCLASS = "keyClass";
-
-    static final String ATT_INDEX = "index";
-
-    static final String TAG_FIELD = "field";
-
-    static final String ATT_DOCFIELD = "docfield";
-
-    static final String ATT_SCALINGFACTOR = "scalingFactor";
-
-    /**
-     * Mapping instance being built.
-     */
-    private ElasticsearchMapping elasticsearchMapping;
-
-    private final ElasticsearchStore<K, T> dataStore;
-
-    /**
-     * Constructor for ElasticsearchMappingBuilder.
-     *
-     * @param store ElasticsearchStore instance
-     */
-    public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
-        this.elasticsearchMapping = new ElasticsearchMapping();
-        this.dataStore = store;
-    }
-
-    /**
-     * Returns the Elasticsearch Mapping being built.
-     *
-     * @return Elasticsearch Mapping instance
-     */
-    public ElasticsearchMapping getElasticsearchMapping() {
-        return elasticsearchMapping;
-    }
-
-    /**
-     * Sets the Elasticsearch Mapping.
-     *
-     * @param elasticsearchMapping Elasticsearch Mapping instance
-     */
-    public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
-        this.elasticsearchMapping = elasticsearchMapping;
-    }
-
-    /**
-     * Reads Elasticsearch mappings from file.
-     *
-     * @param inputStream   Mapping input stream
-     * @param xsdValidation Parameter for enabling XSD validation
-     */
-    public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
-        try {
-            SAXBuilder saxBuilder = new SAXBuilder();
-            if (inputStream == null) {
-                LOG.error("The mapping input stream is null!");
-                throw new GoraException("The mapping input stream is null!");
-            }
-
-            // Convert input stream to a string to use it a few times
-            String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
-
-            // XSD validation for XML file
-            if (xsdValidation) {
-                Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
-                Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
-                        .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
-                schema.newValidator().validate(xmlSource);
-                LOG.info("Mapping file is valid.");
-            }
-
-            Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
-            if (document == null) {
-                LOG.error("The mapping document is null!");
-                throw new GoraException("The mapping document is null!");
-            }
-
-            Element root = document.getRootElement();
-            // Extract class descriptions
-            @SuppressWarnings("unchecked")
-            List<Element> classElements = root.getChildren(TAG_CLASS);
-            for (Element classElement : classElements) {
-                final Class<T> persistentClass = dataStore.getPersistentClass();
-                final Class<K> keyClass = dataStore.getKeyClass();
-                if (haveKeyClass(keyClass, classElement)
-                        && havePersistentClass(persistentClass, classElement)) {
-                    loadPersistentClass(classElement, persistentClass);
-                    break;
-                }
-            }
-        } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
-            throw new RuntimeException(ex);
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
+
+  /**
+   * XSD validation file for the XML mapping.
+   */
+  private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
+
+  // Index description
+  static final String ATT_NAME = "name";
+
+  static final String ATT_TYPE = "type";
+
+  // Class description
+  static final String TAG_CLASS = "class";
+
+  static final String ATT_KEYCLASS = "keyClass";
+
+  static final String ATT_INDEX = "index";
+
+  static final String TAG_FIELD = "field";
+
+  static final String ATT_DOCFIELD = "docfield";
+
+  static final String ATT_SCALINGFACTOR = "scalingFactor";
+
+  /**
+   * Mapping instance being built.
+   */
+  private ElasticsearchMapping elasticsearchMapping;
+
+  private final ElasticsearchStore<K, T> dataStore;
+
+  /**
+   * Constructor for ElasticsearchMappingBuilder.
+   *
+   * @param store ElasticsearchStore instance
+   */
+  public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
+    this.elasticsearchMapping = new ElasticsearchMapping();
+    this.dataStore = store;
+  }
+
+  /**
+   * Returns the Elasticsearch Mapping being built.
+   *
+   * @return Elasticsearch Mapping instance
+   */
+  public ElasticsearchMapping getElasticsearchMapping() {
+    return elasticsearchMapping;
+  }
+
+  /**
+   * Sets the Elasticsearch Mapping.
+   *
+   * @param elasticsearchMapping Elasticsearch Mapping instance
+   */
+  public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
+    this.elasticsearchMapping = elasticsearchMapping;
+  }
+
+  /**
+   * Reads Elasticsearch mappings from file.
+   *
+   * @param inputStream   Mapping input stream
+   * @param xsdValidation Parameter for enabling XSD validation
+   */
+  public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
+    try {
+      SAXBuilder saxBuilder = new SAXBuilder();
+      if (inputStream == null) {
+        LOG.error("The mapping input stream is null!");
+        throw new GoraException("The mapping input stream is null!");
+      }
+
+      // Convert input stream to a string to use it a few times
+      String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
+
+      // XSD validation for XML file
+      if (xsdValidation) {
+        Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+        Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
+                .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
+        schema.newValidator().validate(xmlSource);
+        LOG.info("Mapping file is valid.");
+      }
+
+      Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+      if (document == null) {
+        LOG.error("The mapping document is null!");
+        throw new GoraException("The mapping document is null!");
+      }
+
+      Element root = document.getRootElement();
+      // Extract class descriptions
+      @SuppressWarnings("unchecked")
+      List<Element> classElements = root.getChildren(TAG_CLASS);
+      for (Element classElement : classElements) {
+        final Class<T> persistentClass = dataStore.getPersistentClass();
+        final Class<K> keyClass = dataStore.getKeyClass();
+        if (haveKeyClass(keyClass, classElement)
+                && havePersistentClass(persistentClass, classElement)) {
+          loadPersistentClass(classElement, persistentClass);
+          break;
         }
-        LOG.info("Gora Elasticsearch mapping file was read successfully.");
-    }
-
-    private boolean haveKeyClass(final Class<K> keyClass,
-                                 final Element classElement) {
-        return classElement.getAttributeValue(ATT_KEYCLASS).equals(
-                keyClass.getName());
-    }
-
-    private boolean havePersistentClass(final Class<T> persistentClass,
-                                        final Element classElement) {
-        return classElement.getAttributeValue(ATT_NAME).equals(
-                persistentClass.getName());
+      }
+    } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
+      throw new RuntimeException(ex);
     }
-
-    /**
-     * Handle the XML parsing of the class definition.
-     *
-     * @param classElement the XML node containing the class definition
-     */
-    protected void loadPersistentClass(Element classElement,
-                                       Class<T> pPersistentClass) {
-
-        String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
-        String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
-
+    LOG.info("Gora Elasticsearch mapping file was read successfully.");
+  }
+
+  private boolean haveKeyClass(final Class<K> keyClass,
+                               final Element classElement) {
+    return classElement.getAttributeValue(ATT_KEYCLASS).equals(
+            keyClass.getName());
+  }
+
+  private boolean havePersistentClass(final Class<T> persistentClass,
+                                      final Element classElement) {
+    return classElement.getAttributeValue(ATT_NAME).equals(
+            persistentClass.getName());
+  }
+
+  /**
+   * Handle the XML parsing of the class definition.
+   *
+   * @param classElement the XML node containing the class definition
+   */
+  protected void loadPersistentClass(Element classElement,
+                                     Class<T> pPersistentClass) {
+
+    String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
+    String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
+
+    elasticsearchMapping.setIndexName(indexName);
+    // docNameFromMapping could be null here
+    if (!indexName.equals(indexNameFromMapping)) {
+      ElasticsearchStore.LOG
+              .info("Keyclass and nameclass match, but mismatching index names. "
+                              + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
+                      indexNameFromMapping, indexName);
+      if (indexNameFromMapping != null) {
         elasticsearchMapping.setIndexName(indexName);
-        // docNameFromMapping could be null here
-        if (!indexName.equals(indexNameFromMapping)) {
-            ElasticsearchStore.LOG
-                    .info("Keyclass and nameclass match, but mismatching index names. "
-                            + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
-                            indexNameFromMapping, indexName);
-            if (indexNameFromMapping != null) {
-                elasticsearchMapping.setIndexName(indexName);
-            }
-        }
+      }
+    }
 
-        // Process fields declaration
-        @SuppressWarnings("unchecked")
-        List<Element> fields = classElement.getChildren(TAG_FIELD);
-        for (Element fieldElement : fields) {
-            String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
-            Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
-            Field field;
-            if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
-                int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
-                field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
-            } else {
-                field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
-            }
-            elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
-        }
+    // Process fields declaration
+    @SuppressWarnings("unchecked")
+    List<Element> fields = classElement.getChildren(TAG_FIELD);
+    for (Element fieldElement : fields) {
+      String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
+      Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
+      Field field;
+      if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
+        int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
+        field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
+      } else {
+        field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
+      }
+      elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
     }
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
index 73016cb..e5021fb 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
@@ -24,177 +24,175 @@ import java.util.StringJoiner;
  */
 public class Field {
 
-    private String name;
-    private FieldType dataType;
+  private String name;
+  private FieldType dataType;
+
+  /**
+   * Constructor for Field.
+   *
+   * @param name     Field's name
+   * @param dataType Field's data type
+   */
+  public Field(String name, FieldType dataType) {
+    this.name = name;
+    this.dataType = dataType;
+  }
+
+  /**
+   * Returns the field's name.
+   *
+   * @return Field's name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Sets the field's name.
+   *
+   * @param name Field's name
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Returns the field's data-type.
+   *
+   * @return Field's data-type
+   */
+  public FieldType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * Sets the field's data-type.
+   *
+   * @param dataType Field's data-type
+   */
+  public void setDataType(FieldType dataType) {
+    this.dataType = dataType;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Field field = (Field) o;
+    return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, dataType);
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
+            .add("name='" + name + "'")
+            .add("dataType=" + dataType)
+            .toString();
+  }
+
+  /**
+   * Elasticsearch supported data-type enumeration. For a more detailed list of data
+   * types supported by Elasticsearch refer to
+   * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
+   */
+  public enum DataType {
+    BINARY,
+    BOOLEAN,
+    KEYWORD,
+    CONSTANT_KEYWORD,
+    WILDCARD,
+    LONG,
+    INTEGER,
+    SHORT,
+    BYTE,
+    DOUBLE,
+    FLOAT,
+    HALF_FLOAT,
+    SCALED_FLOAT,
+    OBJECT,
+    FLATTENED,
+    NESTED,
+    TEXT,
+    COMPLETION,
+    SEARCH_AS_YOU_TYPE,
+    TOKEN_COUNT
+  }
+
+  public static class FieldType {
+
+    private DataType type;
+
+    // Parameter for scaled_float type.
+    private int scalingFactor;
 
     /**
-     * Constructor for Field.
+     * Constructor for FieldType.
      *
-     * @param name Field's name
-     * @param dataType Field's data type
+     * @param type Elasticsearch data type
      */
-    public Field(String name, FieldType dataType) {
-        this.name = name;
-        this.dataType = dataType;
+    public FieldType(DataType type) {
+      this.type = type;
     }
 
     /**
-     * Returns the field's name.
+     * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
+     * with scaling factor parameter.
      *
-     * @return Field's name
+     * @param scalingFactor scaled_float field's scaling factor
      */
-    public String getName() {
-        return name;
+    public FieldType(int scalingFactor) {
+      this.type = DataType.SCALED_FLOAT;
+      this.scalingFactor = scalingFactor;
     }
 
     /**
-     * Sets the field's name.
-     *
-     * @param name Field's name
+     * @return Elasticsearch data type
      */
-    public void setName(String name) {
-        this.name = name;
+    public DataType getType() {
+      return type;
     }
 
     /**
-     * Returns the field's data-type.
+     * @param type Elasticsearch data type
+     */
+    public void setType(DataType type) {
+      this.type = type;
+    }
+
+    /**
+     * Returns the scaling factor of scaled_float type.
      *
-     * @return Field's data-type
+     * @return scaled_float field's scaling factor
      */
-    public FieldType getDataType() {
-        return dataType;
+    public int getScalingFactor() {
+      return scalingFactor;
     }
 
     /**
-     * Sets the field's data-type.
+     * Sets the scaling factor of scaled_float type.
      *
-     * @param dataType Field's data-type
+     * @param scalingFactor scaled_float field's scaling factor
      */
-    public void setDataType(FieldType dataType) {
-        this.dataType = dataType;
+    public void setScalingFactor(int scalingFactor) {
+      this.scalingFactor = scalingFactor;
     }
 
     @Override
     public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        Field field = (Field) o;
-        return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      FieldType fieldType = (FieldType) o;
+      return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, dataType);
-    }
-
-    @Override
-    public String toString() {
-        return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
-                .add("name='" + name + "'")
-                .add("dataType=" + dataType)
-                .toString();
-    }
-
-    /**
-     * Elasticsearch supported data-type enumeration. For a more detailed list of data
-     * types supported by Elasticsearch refer to
-     * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
-     */
-    public enum DataType {
-        BINARY,
-        BOOLEAN,
-        KEYWORD,
-        CONSTANT_KEYWORD,
-        WILDCARD,
-        LONG,
-        INTEGER,
-        SHORT,
-        BYTE,
-        DOUBLE,
-        FLOAT,
-        HALF_FLOAT,
-        SCALED_FLOAT,
-        OBJECT,
-        FLATTENED,
-        NESTED,
-        TEXT,
-        COMPLETION,
-        SEARCH_AS_YOU_TYPE,
-        TOKEN_COUNT
-    }
-
-    public static class FieldType {
-
-        private DataType type;
-
-        // Parameter for scaled_float type.
-        private int scalingFactor;
-
-        /**
-         * Constructor for FieldType.
-         *
-         * @param type Elasticsearch data type
-         */
-        public FieldType(DataType type) {
-            this.type = type;
-        }
-
-        /**
-         * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
-         * with scaling factor parameter.
-         *
-         * @param scalingFactor scaled_float field's scaling factor
-         */
-        public FieldType(int scalingFactor) {
-            this.type = DataType.SCALED_FLOAT;
-            this.scalingFactor = scalingFactor;
-        }
-
-        /**
-         *
-         * @return Elasticsearch data type
-         */
-        public DataType getType() {
-            return type;
-        }
-
-        /**
-         *
-         * @param type Elasticsearch data type
-         */
-        public void setType(DataType type) {
-            this.type = type;
-        }
-
-        /**
-         * Returns the scaling factor of scaled_float type.
-         *
-         * @return scaled_float field's scaling factor
-         */
-        public int getScalingFactor() {
-            return scalingFactor;
-        }
-
-        /**
-         * Sets the scaling factor of scaled_float type.
-         *
-         * @param scalingFactor scaled_float field's scaling factor
-         */
-        public void setScalingFactor(int scalingFactor) {
-            this.scalingFactor = scalingFactor;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            FieldType fieldType = (FieldType) o;
-            return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(type, scalingFactor);
-        }
+      return Objects.hash(type, scalingFactor);
     }
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
index f491006..214380e 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
@@ -26,16 +26,16 @@ import org.apache.gora.store.DataStore;
  */
 public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
 
-    /**
-     * Constructor for the query.
-     *
-     * @param dataStore data store used
-     */
-    public ElasticsearchQuery(DataStore<K, T> dataStore) {
-        super(dataStore);
-    }
+  /**
+   * Constructor for the query.
+   *
+   * @param dataStore data store used
+   */
+  public ElasticsearchQuery(DataStore<K, T> dataStore) {
+    super(dataStore);
+  }
 
-    public ElasticsearchQuery() {
-        super(null);
-    }
+  public ElasticsearchQuery() {
+    super(null);
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
index e7f8180..01d5d8e 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
@@ -29,44 +29,44 @@ import java.util.List;
  */
 public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> {
 
-    /**
-     * List of resulting persistent objects.
-     */
-    private List<T> persistentObjects;
+  /**
+   * List of resulting persistent objects.
+   */
+  private List<T> persistentObjects;
 
-    /**
-     * List of resulting objects keys.
-     */
-    private List<K> persistentKeys;
+  /**
+   * List of resulting objects keys.
+   */
+  private List<K> persistentKeys;
 
-    public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
-        super(dataStore, query);
-        this.persistentKeys = persistentKeys;
-        this.persistentObjects = persistentObjects;
-    }
-
-    @Override
-    public float getProgress() {
-        if (persistentObjects.size() == 0) {
-            return 1;
-        }
+  public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
+    super(dataStore, query);
+    this.persistentKeys = persistentKeys;
+    this.persistentObjects = persistentObjects;
+  }
 
-        return offset / (float) persistentObjects.size();
+  @Override
+  public float getProgress() {
+    if (persistentObjects.size() == 0) {
+      return 1;
     }
 
-    @Override
-    public int size() {
-        return persistentObjects.size();
-    }
+    return offset / (float) persistentObjects.size();
+  }
 
-    @Override
-    protected boolean nextInner() {
-        if ((int) offset == persistentObjects.size()) {
-            return false;
-        }
+  @Override
+  public int size() {
+    return persistentObjects.size();
+  }
 
-        persistent = persistentObjects.get((int) offset);
-        key = persistentKeys.get((int) offset);
-        return persistent != null;
+  @Override
+  protected boolean nextInner() {
+    if ((int) offset == persistentObjects.size()) {
+      return false;
     }
+
+    persistent = persistentObjects.get((int) offset);
+    key = persistentKeys.get((int) offset);
+    return persistent != null;
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
index a82de7f..610f0fd 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
@@ -82,7 +82,15 @@ import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
 
 /**
  * Implementation of a Apache Elasticsearch data store to be used by Apache Gora.
@@ -92,773 +100,773 @@ import java.util.*;
  */
 public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
 
-    public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
-    private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
-    public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
-    private static final String XML_MAPPING_DEFINITION = "gora.mapping";
-    public static final String XSD_VALIDATION = "gora.xsd_validation";
-
-    /**
-     * Elasticsearch client
-     */
-    private RestHighLevelClient client;
-
-    /**
-     * Mapping definition for Elasticsearch
-     */
-    private ElasticsearchMapping elasticsearchMapping;
-
-    @Override
-    public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
-        try {
-            LOG.debug("Initializing Elasticsearch store");
-            ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
-            super.initialize(keyClass, persistentClass, properties);
-            ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
-            InputStream mappingStream;
-            if (properties.containsKey(XML_MAPPING_DEFINITION)) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
-                }
-                mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
-            } else {
-                mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
-            }
-            String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
-            builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
-            elasticsearchMapping = builder.getElasticsearchMapping();
-            client = createClient(parameters);
-            LOG.info("Elasticsearch store was successfully initialized.");
-        } catch (Exception ex) {
-            LOG.error("Error while initializing Elasticsearch store", ex);
-            throw new GoraException(ex);
-        }
+  public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
+  private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
+  public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
+  private static final String XML_MAPPING_DEFINITION = "gora.mapping";
+  public static final String XSD_VALIDATION = "gora.xsd_validation";
+
+  /**
+   * Elasticsearch client
+   */
+  private RestHighLevelClient client;
+
+  /**
+   * Mapping definition for Elasticsearch
+   */
+  private ElasticsearchMapping elasticsearchMapping;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
+    try {
+      LOG.debug("Initializing Elasticsearch store");
+      ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+      super.initialize(keyClass, persistentClass, properties);
+      ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
+      InputStream mappingStream;
+      if (properties.containsKey(XML_MAPPING_DEFINITION)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
+        }
+        mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
+      } else {
+        mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
+      }
+      String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
+      builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
+      elasticsearchMapping = builder.getElasticsearchMapping();
+      client = createClient(parameters);
+      LOG.info("Elasticsearch store was successfully initialized.");
+    } catch (Exception ex) {
+      LOG.error("Error while initializing Elasticsearch store", ex);
+      throw new GoraException(ex);
     }
-
-    public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
-        RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
-
-        // Choosing the authentication method.
-        switch (parameters.getAuthenticationType()) {
-            case BASIC:
-                if (parameters.getUsername() != null && parameters.getPassword() != null) {
-                    final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
-                    credentialsProvider.setCredentials(AuthScope.ANY,
-                            new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
-                    clientBuilder.setHttpClientConfigCallback(
-                            httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
-                } else {
-                    throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
-                }
-                break;
-            case TOKEN:
-                if (parameters.getAuthorizationToken() != null) {
-                    Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
-                            parameters.getAuthorizationToken())};
-                    clientBuilder.setDefaultHeaders(defaultHeaders);
-                } else {
-                    throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
-                }
-                break;
-            case APIKEY:
-                if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
-                    String apiKeyAuth = Base64.getEncoder()
-                            .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
-                                    .getBytes(StandardCharsets.UTF_8));
-                    Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
-                    clientBuilder.setDefaultHeaders(defaultHeaders);
-                } else {
-                    throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
-                }
-                break;
-        }
-
-        if (parameters.getConnectTimeout() != 0) {
-            clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
-                    requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
-        }
-
-        if (parameters.getSocketTimeout() != 0) {
-            clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
-                    requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
-        }
-
-        if (parameters.getIoThreadCount() != 0) {
-            clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
-                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
-                            .setIoThreadCount(parameters.getIoThreadCount()).build()));
+  }
+
+  public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
+    RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
+
+    // Choosing the authentication method.
+    switch (parameters.getAuthenticationType()) {
+      case BASIC:
+        if (parameters.getUsername() != null && parameters.getPassword() != null) {
+          final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+          credentialsProvider.setCredentials(AuthScope.ANY,
+                  new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
+          clientBuilder.setHttpClientConfigCallback(
+                  httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+        } else {
+          throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
+        }
+        break;
+      case TOKEN:
+        if (parameters.getAuthorizationToken() != null) {
+          Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
+                  parameters.getAuthorizationToken())};
+          clientBuilder.setDefaultHeaders(defaultHeaders);
+        } else {
+          throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
+        }
+        break;
+      case APIKEY:
+        if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
+          String apiKeyAuth = Base64.getEncoder()
+                  .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
+                          .getBytes(StandardCharsets.UTF_8));
+          Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
+          clientBuilder.setDefaultHeaders(defaultHeaders);
+        } else {
+          throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
         }
-        return new RestHighLevelClient(clientBuilder);
+        break;
     }
 
-    public ElasticsearchMapping getMapping() {
-        return elasticsearchMapping;
+    if (parameters.getConnectTimeout() != 0) {
+      clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+              requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
     }
 
-    @Override
-    public String getSchemaName() {
-        return elasticsearchMapping.getIndexName();
+    if (parameters.getSocketTimeout() != 0) {
+      clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+              requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
     }
 
-    @Override
-    public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
-        return super.getSchemaName(mappingSchemaName, persistentClass);
+    if (parameters.getIoThreadCount() != 0) {
+      clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
+              httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
+                      .setIoThreadCount(parameters.getIoThreadCount()).build()));
     }
-
-    @Override
-    public void createSchema() throws GoraException {
-        CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
-        Map<String, Object> properties = new HashMap<>();
-        for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
-            Map<String, Object> fieldType = new HashMap<>();
-            fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
-            if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
-                fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
-            }
-            properties.put(entry.getKey(), fieldType);
-        }
-        // Special field for range query
-        properties.put("gora_id", new HashMap<String, Object>() {{
-            put("type", "keyword");
-        }});
-        Map<String, Object> mapping = new HashMap<>();
-        mapping.put("properties", properties);
-        request.mapping(mapping);
-        try {
-            if (!client.indices().exists(
-                    new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
-                client.indices().create(request, RequestOptions.DEFAULT);
-            }
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+    return new RestHighLevelClient(clientBuilder);
+  }
+
+  public ElasticsearchMapping getMapping() {
+    return elasticsearchMapping;
+  }
+
+  @Override
+  public String getSchemaName() {
+    return elasticsearchMapping.getIndexName();
+  }
+
+  @Override
+  public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
+    return super.getSchemaName(mappingSchemaName, persistentClass);
+  }
+
+  @Override
+  public void createSchema() throws GoraException {
+    CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
+    Map<String, Object> properties = new HashMap<>();
+    for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
+      Map<String, Object> fieldType = new HashMap<>();
+      fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
+      if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
+        fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
+      }
+      properties.put(entry.getKey(), fieldType);
     }
-
-    @Override
-    public void deleteSchema() throws GoraException {
-        DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
-        try {
-            if (client.indices().exists(
-                    new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
-                client.indices().delete(request, RequestOptions.DEFAULT);
-            }
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+    // Special field for range query
+    properties.put("gora_id", new HashMap<String, Object>() {{
+      put("type", "keyword");
+    }});
+    Map<String, Object> mapping = new HashMap<>();
+    mapping.put("properties", properties);
+    request.mapping(mapping);
+    try {
+      if (!client.indices().exists(
+              new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+        client.indices().create(request, RequestOptions.DEFAULT);
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public boolean schemaExists() throws GoraException {
-        try {
-            return client.indices().exists(
-                    new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+  }
+
+  @Override
+  public void deleteSchema() throws GoraException {
+    DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
+    try {
+      if (client.indices().exists(
+              new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+        client.indices().delete(request, RequestOptions.DEFAULT);
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public boolean exists(K key) throws GoraException {
-        GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
-        getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
-        try {
-            return client.exists(getRequest, RequestOptions.DEFAULT);
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+  }
+
+  @Override
+  public boolean schemaExists() throws GoraException {
+    try {
+      return client.indices().exists(
+              new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public T get(K key, String[] fields) throws GoraException {
-        String[] requestedFields = getFieldsToQuery(fields);
-        List<String> documentFields = new ArrayList<>();
-        for (String requestedField : requestedFields) {
-            documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
-        }
-        try {
-            // Prepare the Elasticsearch request
-            GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
-            GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
-            if (getResponse.isExists()) {
-                Map<String, Object> sourceMap = getResponse.getSourceAsMap();
-
-                // Map of field's name and its value from the Document
-                Map<String, Object> fieldsAndValues = new HashMap<>();
-                for (String field : documentFields) {
-                    fieldsAndValues.put(field, sourceMap.get(field));
-                }
-
-                // Build the corresponding persistent
-                return newInstance(fieldsAndValues, requestedFields);
-            } else {
-                return null;
-            }
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+  }
+
+  @Override
+  public boolean exists(K key) throws GoraException {
+    GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+    getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
+    try {
+      return client.exists(getRequest, RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public void put(K key, T obj) throws GoraException {
-        if (obj.isDirty()) {
-            Schema schemaObj = obj.getSchema();
-            List<Schema.Field> fields = schemaObj.getFields();
-            Map<String, Object> jsonMap = new HashMap<>();
-            for (Schema.Field field : fields) {
-                Field mappedField = elasticsearchMapping.getFields().get(field.name());
-                if (mappedField != null) {
-                    Object fieldValue = obj.get(field.pos());
-                    if (fieldValue != null) {
-                        Schema fieldSchema = field.schema();
-                        Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
-                        jsonMap.put(mappedField.getName(), serializedObj);
-                    }
-                }
-            }
-            // Special field for range query
-            jsonMap.put("gora_id", key);
-            // Prepare the Elasticsearch request
-            IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
-            try {
-                client.index(request, RequestOptions.DEFAULT);
-            } catch (IOException ex) {
-                throw new GoraException(ex);
-            }
-        } else {
-            LOG.info("Ignored putting object {} in the store as it is neither "
-                    + "new, neither dirty.", new Object[]{obj});
-        }
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws GoraException {
+    String[] requestedFields = getFieldsToQuery(fields);
+    List<String> documentFields = new ArrayList<>();
+    for (String requestedField : requestedFields) {
+      documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
     }
-
-    @Override
-    public boolean delete(K key) throws GoraException {
-        DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
-        try {
-            DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
-            return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+    try {
+      // Prepare the Elasticsearch request
+      GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+      GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+      if (getResponse.isExists()) {
+        Map<String, Object> sourceMap = getResponse.getSourceAsMap();
+
+        // Map of field's name and its value from the Document
+        Map<String, Object> fieldsAndValues = new HashMap<>();
+        for (String field : documentFields) {
+          fieldsAndValues.put(field, sourceMap.get(field));
+        }
+
+        // Build the corresponding persistent
+        return newInstance(fieldsAndValues, requestedFields);
+      } else {
+        return null;
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public long deleteByQuery(Query<K, T> query) throws GoraException {
-        try {
-            BulkByScrollResponse bulkResponse;
-            if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
-                UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
-                QueryBuilder matchDocumentsWithinRange = QueryBuilders
-                        .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
-                updateRequest.setQuery(matchDocumentsWithinRange);
-
-                // Create a script for deleting fields
-                StringBuilder toDelete = new StringBuilder();
-                String[] fieldsToDelete = query.getFields();
-                for (String field : fieldsToDelete) {
-                    String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
-                    toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField));
-                }
-                //toDelete.deleteCharAt(toDelete.length() - 1);
-                updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
-                bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
-                return bulkResponse.getUpdated();
-            } else {
-                DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
-                QueryBuilder matchDocumentsWithinRange = QueryBuilders
-                        .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
-                deleteRequest.setQuery(matchDocumentsWithinRange);
-                bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
-                return bulkResponse.getDeleted();
-            }
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+  }
+
+  @Override
+  public void put(K key, T obj) throws GoraException {
+    if (obj.isDirty()) {
+      Schema schemaObj = obj.getSchema();
+      List<Schema.Field> fields = schemaObj.getFields();
+      Map<String, Object> jsonMap = new HashMap<>();
+      for (Schema.Field field : fields) {
+        Field mappedField = elasticsearchMapping.getFields().get(field.name());
+        if (mappedField != null) {
+          Object fieldValue = obj.get(field.pos());
+          if (fieldValue != null) {
+            Schema fieldSchema = field.schema();
+            Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
+            jsonMap.put(mappedField.getName(), serializedObj);
+          }
+        }
+      }
+      // Special field for range query
+      jsonMap.put("gora_id", key);
+      // Prepare the Elasticsearch request
+      IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
+      try {
+        client.index(request, RequestOptions.DEFAULT);
+      } catch (IOException ex) {
+        throw new GoraException(ex);
+      }
+    } else {
+      LOG.info("Ignored putting object {} in the store as it is neither "
+              + "new, neither dirty.", new Object[]{obj});
+    }
+  }
+
+  @Override
+  public boolean delete(K key) throws GoraException {
+    DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
+    try {
+      DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
+      return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws GoraException {
+    try {
+      BulkByScrollResponse bulkResponse;
+      if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
+        UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
+        QueryBuilder matchDocumentsWithinRange = QueryBuilders
+                .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+        updateRequest.setQuery(matchDocumentsWithinRange);
+
+        // Create a script for deleting fields
+        StringBuilder toDelete = new StringBuilder();
+        String[] fieldsToDelete = query.getFields();
+        for (String field : fieldsToDelete) {
+          String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
+          toDelete.append(String.format(Locale.getDefault(), "ctx._source.remove('%s');", elasticsearchField));
+        }
+        //toDelete.deleteCharAt(toDelete.length() - 1);
+        updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
+        bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
+        return bulkResponse.getUpdated();
+      } else {
+        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
+        QueryBuilder matchDocumentsWithinRange = QueryBuilders
+                .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+        deleteRequest.setQuery(matchDocumentsWithinRange);
+        bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
+        return bulkResponse.getDeleted();
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
 
-    @Override
-    public Result<K, T> execute(Query<K, T> query) throws GoraException {
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws GoraException {
+    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
 
-        // Set the query result limit
-        int size = (int) query.getLimit();
-        if (size != -1) {
-            searchSourceBuilder.size(size);
-        }
-        try {
-            // Build the actual Elasticsearch range query
-            QueryBuilder rangeQueryBuilder = QueryBuilders
-                    .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
-            searchSourceBuilder.query(rangeQueryBuilder);
-            SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
-            searchRequest.source(searchSourceBuilder);
-            SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
-
-            String[] avroFields = getFieldsToQuery(query.getFields());
-
-            SearchHits hits = searchResponse.getHits();
-            SearchHit[] searchHits = hits.getHits();
-            List<K> hitId = new ArrayList<>();
-
-            // Check filter
-            Filter<K, T> queryFilter = query.getFilter();
-            List<T> filteredObjects = new ArrayList<>();
-            for (SearchHit hit : searchHits) {
-                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
-                if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
-                    filteredObjects.add(newInstance(sourceAsMap, avroFields));
-                    hitId.add((K) hit.getId());
-                }
-            }
-            return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+    // Set the query result limit
+    int size = (int) query.getLimit();
+    if (size != -1) {
+      searchSourceBuilder.size(size);
     }
-
-    @Override
-    public Query<K, T> newQuery() {
-        ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
-        query.setFields(getFieldsToQuery(null));
-        return query;
+    try {
+      // Build the actual Elasticsearch range query
+      QueryBuilder rangeQueryBuilder = QueryBuilders
+              .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
+      searchSourceBuilder.query(rangeQueryBuilder);
+      SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
+      searchRequest.source(searchSourceBuilder);
+      SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+
+      String[] avroFields = getFieldsToQuery(query.getFields());
+
+      SearchHits hits = searchResponse.getHits();
+      SearchHit[] searchHits = hits.getHits();
+      List<K> hitId = new ArrayList<>();
+
+      // Check filter
+      Filter<K, T> queryFilter = query.getFilter();
+      List<T> filteredObjects = new ArrayList<>();
+      for (SearchHit hit : searchHits) {
+        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
+        if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
+          filteredObjects.add(newInstance(sourceAsMap, avroFields));
+          hitId.add((K) hit.getId());
+        }
+      }
+      return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
-        List<PartitionQuery<K, T>> partitions = new ArrayList<>();
-        PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
-                query);
-        partitionQuery.setConf(getConf());
-        partitions.add(partitionQuery);
-        return partitions;
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
+    query.setFields(getFieldsToQuery(null));
+    return query;
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
+    List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+    PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+            query);
+    partitionQuery.setConf(getConf());
+    partitions.add(partitionQuery);
+    return partitions;
+  }
+
+  @Override
+  public void flush() throws GoraException {
+    try {
+      client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+      client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
-
-    @Override
-    public void flush() throws GoraException {
-        try {
-            client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
-            client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
+  }
+
+  @Override
+  public void close() {
+    try {
+      client.close();
+      LOG.info("Elasticsearch datastore destroyed successfully.");
+    } catch (IOException ex) {
+      LOG.error(ex.getMessage(), ex);
     }
-
-    @Override
-    public void close() {
-        try {
-            client.close();
-            LOG.info("Elasticsearch datastore destroyed successfully.");
-        } catch (IOException ex) {
-            LOG.error(ex.getMessage(), ex);
-        }
+  }
+
+  /**
+   * Build a new instance of the persisted class from the Document retrieved from the database.
+   *
+   * @param fieldsAndValues Map of field's name and its value from the Document
+   *                        that results from the query to the database
+   * @param requestedFields the list of fields to be mapped to the persistence class instance
+   * @return a persistence class instance which content was deserialized from the Document
+   * @throws IOException
+   */
+  public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
+    // Create new empty persistent bean instance
+    T persistent = newPersistent();
+
+    requestedFields = getFieldsToQuery(requestedFields);
+    // Populate each field
+    for (String objField : requestedFields) {
+      Schema.Field field = fieldMap.get(objField);
+      Schema fieldSchema = field.schema();
+      String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
+      Object fieldValue = fieldsAndValues.get(docFieldName);
+
+      Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
+      persistent.put(field.pos(), result);
     }
-
-    /**
-     * Build a new instance of the persisted class from the Document retrieved from the database.
-     *
-     * @param fieldsAndValues Map of field's name and its value from the Document
-     *                        that results from the query to the database
-     * @param requestedFields the list of fields to be mapped to the persistence class instance
-     * @return a persistence class instance which content was deserialized from the Document
-     * @throws IOException
-     */
-    public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
-        // Create new empty persistent bean instance
-        T persistent = newPersistent();
-
-        requestedFields = getFieldsToQuery(requestedFields);
-        // Populate each field
-        for (String objField : requestedFields) {
-            Schema.Field field = fieldMap.get(objField);
-            Schema fieldSchema = field.schema();
-            String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
-            Object fieldValue = fieldsAndValues.get(docFieldName);
-
-            Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
-            persistent.put(field.pos(), result);
-        }
-        persistent.clearDirty();
-        return persistent;
-    }
-
-    /**
-     * Deserialize an Elasticsearch object to a persistent Avro object.
-     *
-     * @param avroField          persistent Avro class field to which the value will be deserialized
-     * @param avroFieldSchema    schema for the persistent Avro class field
-     * @param elasticsearchValue Elasticsearch field value to be deserialized
-     * @return deserialized Avro object from the Elasticsearch object
-     * @throws GoraException when the given Elasticsearch value cannot be deserialized
-     */
-    private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
-                                         Object elasticsearchValue) throws GoraException {
-        Object fieldValue;
-        switch (avroFieldSchema.getType()) {
-            case MAP:
-                fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
-                break;
-            case RECORD:
-                fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
-                break;
-            case ARRAY:
-                fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
-                break;
-            case BOOLEAN:
-                fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
-                break;
-            case BYTES:
-                fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
-                break;
-            case FIXED:
-            case NULL:
-                fieldValue = null;
-                break;
-            case UNION:
-                fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
-                break;
-            case DOUBLE:
-                fieldValue = Double.parseDouble(elasticsearchValue.toString());
-                break;
-            case ENUM:
-                fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
-                break;
-            case FLOAT:
-                fieldValue = Float.parseFloat(elasticsearchValue.toString());
-                break;
-            case INT:
-                fieldValue = Integer.parseInt(elasticsearchValue.toString());
-                break;
-            case LONG:
-                fieldValue = Long.parseLong(elasticsearchValue.toString());
-                break;
-            case STRING:
-                fieldValue = new Utf8(elasticsearchValue.toString());
-                break;
-            default:
-                fieldValue = elasticsearchValue;
-        }
-        return fieldValue;
-    }
-
-    /**
-     * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
-     * that can safely be written into Avro persistent object.
-     *
-     * @param avroField          persistent Avro class field to which the value will be deserialized
-     * @param avroFieldSchema    schema for the persistent Avro class field
-     * @param elasticsearchValue Elasticsearch field value to be deserialized
-     * @return deserialized Avro List from the given Elasticsearch value
-     * @throws GoraException when one of the underlying values cannot be deserialized
-     */
-    private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
-                                         Object elasticsearchValue) throws GoraException {
-        List<Object> list = new ArrayList<>();
-        if (elasticsearchValue != null) {
-            for (Object item : (List<Object>) elasticsearchValue) {
-                Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
-                list.add(result);
-            }
-        }
-        return new DirtyListWrapper<>(list);
-    }
-
-    /**
-     * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
-     * that can safely be written into Avro persistent object.
-     *
-     * @param avroField        persistent Avro class field to which the value will be deserialized
-     * @param avroFieldSchema  schema for the persistent Avro class field
-     * @param elasticsearchMap Elasticsearch Map value to be deserialized
-     * @return deserialized Avro Map from the given Elasticsearch Map value
-     * @throws GoraException when one of the underlying values cannot be deserialized
-     */
-    private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
-                                        Map<String, Object> elasticsearchMap) throws GoraException {
-        Map<Utf8, Object> deserializedMap = new HashMap<>();
-        if (elasticsearchMap != null) {
-            for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
-                String mapKey = entry.getKey();
-                Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
-                deserializedMap.put(new Utf8(mapKey), mapValue);
-            }
-        }
-        return new DirtyMapWrapper<>(deserializedMap);
-    }
-
-    /**
-     * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
-     * that can safely be written into Avro persistent object.
-     *
-     * @param avroFieldSchema     schema for the persistent Avro class field
-     * @param elasticsearchRecord Elasticsearch Record value to be deserialized
-     * @return deserialized Avro Object from the given Elasticsearch Record value
-     * @throws GoraException when one of the underlying values cannot be deserialized
-     */
-    private Object fromElasticsearchRecord(Schema avroFieldSchema,
-                                           Map<String, Object> elasticsearchRecord) throws GoraException {
-        Class<?> clazz;
-        try {
-            clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
-        } catch (ClassNotFoundException ex) {
-            throw new GoraException(ex);
-        }
-        PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
-        for (Schema.Field recField : avroFieldSchema.getFields()) {
-            Schema innerSchema = recField.schema();
-            Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
-            record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
-        }
-        return record;
-    }
-
-    /**
-     * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
-     * that can safely be written into Avro persistent object.
-     *
-     * @param avroField          persistent Avro class field to which the value will be deserialized
-     * @param avroFieldSchema    schema for the persistent Avro class field
-     * @param elasticsearchUnion Elasticsearch Union value to be deserialized
-     * @return deserialized Avro Object from the given Elasticsearch Union value
-     * @throws GoraException when one of the underlying values cannot be deserialized
-     */
-    private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
-        Object deserializedUnion;
-        Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
-        Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
-        if (avroFieldSchema.getTypes().size() == 2 &&
-                (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
-                !type0.equals(type1)) {
-            int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
-            Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
-            deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
-        } else if (avroFieldSchema.getTypes().size() == 3) {
-            Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
-            if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
-                    (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
-                if (elasticsearchUnion == null) {
-                    deserializedUnion = null;
-                } else if (elasticsearchUnion instanceof String) {
-                    throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
-                } else {
-                    int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
-                    Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
-                    deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
-                }
-            } else {
-                throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
-            }
+    persistent.clearDirty();
+    return persistent;
+  }
+
+  /**
+   * Deserialize an Elasticsearch object to a persistent Avro object.
+   *
+   * @param avroField          persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema    schema for the persistent Avro class field
+   * @param elasticsearchValue Elasticsearch field value to be deserialized
+   * @return deserialized Avro object from the Elasticsearch object
+   * @throws GoraException when the given Elasticsearch value cannot be deserialized
+   */
+  private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
+                                       Object elasticsearchValue) throws GoraException {
+    Object fieldValue;
+    switch (avroFieldSchema.getType()) {
+      case MAP:
+        fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
+        break;
+      case RECORD:
+        fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
+        break;
+      case ARRAY:
+        fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
+        break;
+      case BOOLEAN:
+        fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
+        break;
+      case BYTES:
+        fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
+        break;
+      case FIXED:
+      case NULL:
+        fieldValue = null;
+        break;
+      case UNION:
+        fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
+        break;
+      case DOUBLE:
+        fieldValue = Double.parseDouble(elasticsearchValue.toString());
+        break;
+      case ENUM:
+        fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
+        break;
+      case FLOAT:
+        fieldValue = Float.parseFloat(elasticsearchValue.toString());
+        break;
+      case INT:
+        fieldValue = Integer.parseInt(elasticsearchValue.toString());
+        break;
+      case LONG:
+        fieldValue = Long.parseLong(elasticsearchValue.toString());
+        break;
+      case STRING:
+        fieldValue = new Utf8(elasticsearchValue.toString());
+        break;
+      default:
+        fieldValue = elasticsearchValue;
+    }
+    return fieldValue;
+  }
+
+  /**
+   * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroField          persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema    schema for the persistent Avro class field
+   * @param elasticsearchValue Elasticsearch field value to be deserialized
+   * @return deserialized Avro List from the given Elasticsearch value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
+                                       Object elasticsearchValue) throws GoraException {
+    List<Object> list = new ArrayList<>();
+    if (elasticsearchValue != null) {
+      for (Object item : (List<Object>) elasticsearchValue) {
+        Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
+        list.add(result);
+      }
+    }
+    return new DirtyListWrapper<>(list);
+  }
+
+  /**
+   * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroField        persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema  schema for the persistent Avro class field
+   * @param elasticsearchMap Elasticsearch Map value to be deserialized
+   * @return deserialized Avro Map from the given Elasticsearch Map value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
+                                      Map<String, Object> elasticsearchMap) throws GoraException {
+    Map<Utf8, Object> deserializedMap = new HashMap<>();
+    if (elasticsearchMap != null) {
+      for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
+        String mapKey = entry.getKey();
+        Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
+        deserializedMap.put(new Utf8(mapKey), mapValue);
+      }
+    }
+    return new DirtyMapWrapper<>(deserializedMap);
+  }
+
+  /**
+   * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroFieldSchema     schema for the persistent Avro class field
+   * @param elasticsearchRecord Elasticsearch Record value to be deserialized
+   * @return deserialized Avro Object from the given Elasticsearch Record value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchRecord(Schema avroFieldSchema,
+                                         Map<String, Object> elasticsearchRecord) throws GoraException {
+    Class<?> clazz;
+    try {
+      clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
+    } catch (ClassNotFoundException ex) {
+      throw new GoraException(ex);
+    }
+    PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
+    for (Schema.Field recField : avroFieldSchema.getFields()) {
+      Schema innerSchema = recField.schema();
+      Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
+      record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
+    }
+    return record;
+  }
+
+  /**
+   * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroField          persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema    schema for the persistent Avro class field
+   * @param elasticsearchUnion Elasticsearch Union value to be deserialized
+   * @return deserialized Avro Object from the given Elasticsearch Union value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
+    Object deserializedUnion;
+    Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+    Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+    if (avroFieldSchema.getTypes().size() == 2 &&
+            (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+            !type0.equals(type1)) {
+      int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+      Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+      deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
+    } else if (avroFieldSchema.getTypes().size() == 3) {
+      Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+      if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+              (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+        if (elasticsearchUnion == null) {
+          deserializedUnion = null;
+        } else if (elasticsearchUnion instanceof String) {
+          throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
         } else {
-            throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
-        }
-        return deserializedUnion;
-    }
-
-    /**
-     * Serialize a persistent Avro object as used in Gora generated classes to
-     * an object that can be written into Elasticsearch.
-     *
-     * @param avroFieldSchema schema for the persistent Avro class field
-     * @param avroFieldValue  persistent Avro field value to be serialized
-     * @return serialized field value
-     * @throws GoraException when the given Avro object cannot be serialized
-     */
-    private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
-        Object output = avroFieldValue;
-        switch (avroFieldSchema.getType()) {
-            case ARRAY:
-                output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
-                break;
-            case MAP:
-                output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
-                break;
-            case RECORD:
-                output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
-                break;
-            case BYTES:
-                output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
-                break;
-            case UNION:
-                output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
-                break;
-            case BOOLEAN:
-            case DOUBLE:
-            case ENUM:
-            case FLOAT:
-            case INT:
-            case LONG:
-            case STRING:
-                output = avroFieldValue.toString();
-                break;
-            case FIXED:
-                break;
-            case NULL:
-                output = null;
-                break;
-        }
-        return output;
-    }
-
-    /**
-     * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
-     * List that can safely be written into Elasticsearch.
-     *
-     * @param collection      the collection to be serialized
-     * @param avroFieldSchema field schema for the underlying type
-     * @return a List version of the collection that can be safely written into Elasticsearch
-     * @throws GoraException when one of the underlying values cannot be serialized
-     */
-    private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
-        List<Object> list = new ArrayList<>();
-        for (Object item : collection) {
-            Object result = serializeFieldValue(avroFieldSchema, item);
-            list.add(result);
-        }
-        return list;
-    }
-
-    /**
-     * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
-     * map that can safely be written into Elasticsearch.
-     *
-     * @param map             the map to be serialized
-     * @param avroFieldSchema field schema for the underlying type
-     * @return a Map version of the Java map that can be safely written into Elasticsearch
-     * @throws GoraException when one of the underlying values cannot be serialized
-     */
-    private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
-        Map<CharSequence, Object> serializedMap = new HashMap<>();
-        for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
-            String mapKey = entry.getKey().toString();
-            Object mapValue = entry.getValue();
-            Object result = serializeFieldValue(avroFieldSchema, mapValue);
-            serializedMap.put(mapKey, result);
-        }
-        return serializedMap;
-    }
-
-    /**
-     * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
-     * record that can safely be written into Elasticsearch.
-     *
-     * @param record          the object to be serialized
-     * @param avroFieldSchema field schema for the underlying type
-     * @return a record version of the Java object that can be safely written into Elasticsearch
-     * @throws GoraException when one of the underlying values cannot be serialized
-     */
-    private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
-        Map<CharSequence, Object> serializedRecord = new HashMap<>();
-        for (Schema.Field member : avroFieldSchema.getFields()) {
-            Object innerValue = ((PersistentBase) record).get(member.pos());
-            serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
-        }
-        return serializedRecord;
-    }
-
-    /**
-     * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
-     * object that can safely be written into Elasticsearch.
-     *
-     * @param union           the object to be serialized
-     * @param avroFieldSchema field schema for the underlying type
-     * @return a object version of the Java object that can be safely written into Elasticsearch
-     * @throws GoraException when one of the underlying values cannot be serialized
-     */
-    private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
-        Object serializedUnion;
-        Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
-        Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
-        if (avroFieldSchema.getTypes().size() == 2 &&
-                (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
-                !type0.equals(type1)) {
-            int schemaPos = getUnionSchema(union, avroFieldSchema);
-            Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
-            serializedUnion = serializeFieldValue(unionSchema, union);
-        } else if (avroFieldSchema.getTypes().size() == 3) {
-            Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
-            if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
-                    (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
-                if (union == null) {
-                    serializedUnion = null;
-                } else if (union instanceof String) {
-                    throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
-                } else {
-                    int schemaPos = getUnionSchema(union, avroFieldSchema);
-                    Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
-                    serializedUnion = recordToElasticsearch(union, unionSchema);
-                }
-            } else {
-                throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
-            }
+          int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+          Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+          deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
+        }
+      } else {
+        throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+      }
+    } else {
+      throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+    }
+    return deserializedUnion;
+  }
+
+  /**
+   * Serialize a persistent Avro object as used in Gora generated classes to
+   * an object that can be written into Elasticsearch.
+   *
+   * @param avroFieldSchema schema for the persistent Avro class field
+   * @param avroFieldValue  persistent Avro field value to be serialized
+   * @return serialized field value
+   * @throws GoraException when the given Avro object cannot be serialized
+   */
+  private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
+    Object output = avroFieldValue;
+    switch (avroFieldSchema.getType()) {
+      case ARRAY:
+        output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
+        break;
+      case MAP:
+        output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
+        break;
+      case RECORD:
+        output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
+        break;
+      case BYTES:
+        output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
+        break;
+      case UNION:
+        output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
+        break;
+      case BOOLEAN:
+      case DOUBLE:
+      case ENUM:
+      case FLOAT:
+      case INT:
+      case LONG:
+      case STRING:
+        output = avroFieldValue.toString();
+        break;
+      case FIXED:
+        break;
+      case NULL:
+        output = null;
+        break;
+    }
+    return output;
+  }
+
+  /**
+   * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
+   * List that can safely be written into Elasticsearch.
+   *
+   * @param collection      the collection to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a List version of the collection that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
+    List<Object> list = new ArrayList<>();
+    for (Object item : collection) {
+      Object result = serializeFieldValue(avroFieldSchema, item);
+      list.add(result);
+    }
+    return list;
+  }
+
+  /**
+   * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
+   * map that can safely be written into Elasticsearch.
+   *
+   * @param map             the map to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a Map version of the Java map that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
+    Map<CharSequence, Object> serializedMap = new HashMap<>();
+    for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
+      String mapKey = entry.getKey().toString();
+      Object mapValue = entry.getValue();
+      Object result = serializeFieldValue(avroFieldSchema, mapValue);
+      serializedMap.put(mapKey, result);
+    }
+    return serializedMap;
+  }
+
+  /**
+   * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+   * record that can safely be written into Elasticsearch.
+   *
+   * @param record          the object to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a record version of the Java object that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
+    Map<CharSequence, Object> serializedRecord = new HashMap<>();
+    for (Schema.Field member : avroFieldSchema.getFields()) {
+      Object innerValue = ((PersistentBase) record).get(member.pos());
+      serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
+    }
+    return serializedRecord;
+  }
+
+  /**
+   * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+   * object that can safely be written into Elasticsearch.
+   *
+   * @param union           the object to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a object version of the Java object that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
+    Object serializedUnion;
+    Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+    Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+    if (avroFieldSchema.getTypes().size() == 2 &&
+            (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+            !type0.equals(type1)) {
+      int schemaPos = getUnionSchema(union, avroFieldSchema);
+      Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+      serializedUnion = serializeFieldValue(unionSchema, union);
+    } else if (avroFieldSchema.getTypes().size() == 3) {
+      Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+      if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+              (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+        if (union == null) {
+          serializedUnion = null;
+        } else if (union instanceof String) {
+          throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
         } else {
-            throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
-        }
-        return serializedUnion;
-    }
-
-    /**
-     * Method to retrieve the corresponding schema type index of a particular
-     * object having UNION schema. As UNION type can have one or more types and at
-     * a given instance, it holds an object of only one type of the defined types,
-     * this method is used to figure out the corresponding instance's schema type
-     * index.
-     *
-     * @param instanceValue value that the object holds
-     * @param unionSchema   union schema containing all of the data types
-     * @return the unionSchemaPosition corresponding schema position
-     */
-    private int getUnionSchema(Object instanceValue, Schema unionSchema) {
-        int unionSchemaPos = 0;
-        for (Schema currentSchema : unionSchema.getTypes()) {
-            Schema.Type schemaType = currentSchema.getType();
-            if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
-                return unionSchemaPos;
-            }
-            if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
-                return unionSchemaPos;
-            }
-            unionSchemaPos++;
-        }
-        return 0;
+          int schemaPos = getUnionSchema(union, avroFieldSchema);
+          Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+          serializedUnion = recordToElasticsearch(union, unionSchema);
+        }
+      } else {
+        throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+      }
+    } else {
+      throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+    }
+    return serializedUnion;
+  }
+
+  /**
+   * Method to retrieve the corresponding schema type index of a particular
+   * object having UNION schema. As UNION type can have one or more types and at
+   * a given instance, it holds an object of only one type of the defined types,
+   * this method is used to figure out the corresponding instance's schema type
+   * index.
+   *
+   * @param instanceValue value that the object holds
+   * @param unionSchema   union schema containing all of the data types
+   * @return the unionSchemaPosition corresponding schema position
+   */
+  private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+    int unionSchemaPos = 0;
+    for (Schema currentSchema : unionSchema.getTypes()) {
+      Schema.Type schemaType = currentSchema.getType();
+      if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
+        return unionSchemaPos;
+      }
+      unionSchemaPos++;
     }
+    return 0;
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
index 01466ee..c253e87 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
@@ -25,29 +25,29 @@ import java.util.List;
  */
 public class ElasticsearchStoreCollectionMetadata {
 
-    /**
-     * Collection document keys present in a given collection at ElasticsearchStore.
-     */
-    private List<String> documentKeys = new ArrayList<>();
-
-    /**
-     * Collection document types present in a given collection at ElasticsearchStore.
-     */
-    private List<String> documentTypes = new ArrayList<>();
-
-    public List<String> getDocumentKeys() {
-        return documentKeys;
-    }
-
-    public void setDocumentKeys(List<String> documentKeys) {
-        this.documentKeys = documentKeys;
-    }
-
-    public List<String> getDocumentTypes() {
-        return documentTypes;
-    }
-
-    public void setDocumentTypes(List<String> documentTypes) {
-        this.documentTypes = documentTypes;
-    }
+  /**
+   * Collection document keys present in a given collection at ElasticsearchStore.
+   */
+  private List<String> documentKeys = new ArrayList<>();
+
+  /**
+   * Collection document types present in a given collection at ElasticsearchStore.
+   */
+  private List<String> documentTypes = new ArrayList<>();
+
+  public List<String> getDocumentKeys() {
+    return documentKeys;
+  }
+
+  public void setDocumentKeys(List<String> documentKeys) {
+    this.documentKeys = documentKeys;
+  }
+
+  public List<String> getDocumentTypes() {
+    return documentTypes;
+  }
+
+  public void setDocumentTypes(List<String> documentTypes) {
+    this.documentTypes = documentTypes;
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
index 6806aa3..465c268 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
@@ -35,68 +35,68 @@ import java.util.Map;
 
 public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
 
-    private RestHighLevelClient elasticsearchClient;
+  private RestHighLevelClient elasticsearchClient;
 
-    @Override
-    public void initialize() throws GoraException {
-        ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
-        elasticsearchClient = ElasticsearchStore.createClient(parameters);
-    }
+  @Override
+  public void initialize() throws GoraException {
+    ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+    elasticsearchClient = ElasticsearchStore.createClient(parameters);
+  }
 
-    @Override
-    public String getType() {
-        return "ELASTICSEARCH";
-    }
+  @Override
+  public String getType() {
+    return "ELASTICSEARCH";
+  }
 
-    @Override
-    public List<String> getTablesNames() throws GoraException {
-        GetIndexRequest request = new GetIndexRequest("*");
-        GetIndexResponse response;
-        try {
-            response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
-        if (response == null) {
-            LOG.error("Could not find indices.");
-            throw new GoraException("Could not find indices.");
-        }
-        return Arrays.asList(response.getIndices());
+  @Override
+  public List<String> getTablesNames() throws GoraException {
+    GetIndexRequest request = new GetIndexRequest("*");
+    GetIndexResponse response;
+    try {
+      response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
     }
+    if (response == null) {
+      LOG.error("Could not find indices.");
+      throw new GoraException("Could not find indices.");
+    }
+    return Arrays.asList(response.getIndices());
+  }
 
-    @Override
-    public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
-        GetIndexRequest request = new GetIndexRequest(tableName);
-        GetIndexResponse getIndexResponse;
-        try {
-            getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
-        } catch (IOException ex) {
-            throw new GoraException(ex);
-        }
-        MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
-        Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
-
-        List<String> documentTypes = new ArrayList<>();
-        List<String> documentKeys = new ArrayList<>();
+  @Override
+  public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
+    GetIndexRequest request = new GetIndexRequest(tableName);
+    GetIndexResponse getIndexResponse;
+    try {
+      getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+    MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
+    Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
 
-        for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
-            Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
-            documentTypes.add((String) subEntry.get("type"));
-            documentKeys.add(entry.getKey());
-        }
+    List<String> documentTypes = new ArrayList<>();
+    List<String> documentKeys = new ArrayList<>();
 
-        ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
-        collectionMetadata.setDocumentKeys(documentKeys);
-        collectionMetadata.setDocumentTypes(documentTypes);
-        return collectionMetadata;
+    for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
+      Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
+      documentTypes.add((String) subEntry.get("type"));
+      documentKeys.add(entry.getKey());
     }
 
-    @Override
-    public void close() throws IOException {
-        if (elasticsearchClient != null) {
-            this.elasticsearchClient.close();
-        }
+    ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
+    collectionMetadata.setDocumentKeys(documentKeys);
+    collectionMetadata.setDocumentTypes(documentTypes);
+    return collectionMetadata;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (elasticsearchClient != null) {
+      this.elasticsearchClient.close();
     }
+  }
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
index dbbd45b..236b306 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
@@ -20,16 +20,16 @@ package org.apache.gora.elasticsearch.utils;
  * Authentication type to connect to the Elasticsearch server.
  */
 public enum AuthenticationType {
-    /**
-     * Basic authentication requires to provide a username and password.
-     */
-    BASIC,
-    /**
-     * Token authentication requires to provide an Elasticsearch access token.
-     */
-    TOKEN,
-    /**
-     * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
-     */
-    APIKEY
+  /**
+   * Basic authentication requires to provide a username and password.
+   */
+  BASIC,
+  /**
+   * Token authentication requires to provide an Elasticsearch access token.
+   */
+  TOKEN,
+  /**
+   * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
+   */
+  APIKEY
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
index fa9bdc2..4f0ca4a 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
@@ -20,30 +20,30 @@ package org.apache.gora.elasticsearch.utils;
  * Constants file for Elasticsearch.
  */
 public class ElasticsearchConstants {
-    /**
-     * Property indicating if the hadoop configuration has priority or not.
-     */
-    public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
+  /**
+   * Property indicating if the hadoop configuration has priority or not.
+   */
+  public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
 
-    /**
-     * Default configurations for Elasticsearch.
-     */
-    public static final String DEFAULT_HOST = "localhost";
-    public static final int DEFAULT_PORT = 9200;
+  /**
+   * Default configurations for Elasticsearch.
+   */
+  public static final String DEFAULT_HOST = "localhost";
+  public static final int DEFAULT_PORT = 9200;
 
-    /**
-     * List of keys used in the configuration file of Elasticsearch.
-     */
-    public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
-    public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
-    public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
-    public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
-    public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
-    public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
-    public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
-    public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
-    public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
-    public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
-    public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
-    public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
+  /**
+   * List of keys used in the configuration file of Elasticsearch.
+   */
+  public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
+  public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
+  public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
+  public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
+  public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
+  public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
+  public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
+  public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
+  public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
+  public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
+  public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
+  public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
 }
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
index 4b0cef4..187b02a 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
@@ -1,14 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,249 +25,248 @@ import java.util.Properties;
  */
 public class ElasticsearchParameters {
 
-    /**
-     * Elasticsearch server host.
-     */
-    private String host;
-
-    /**
-     * Elasticsearch server port.
-     */
-    private int port;
-
-    /**
-     * Elasticsearch server scheme.
-     * Optional. If not provided, defaults to http.
-     */
-    private String scheme;
-
-    /**
-     * Authentication type to connect to the server.
-     * Can be BASIC, TOKEN or APIKEY.
-     */
-    private AuthenticationType authenticationType;
-
-    /**
-     * Username to use for server authentication.
-     * Required for BASIC authentication to connect to the server.
-     */
-    private String username;
-
-    /**
-     * Password to use for server authentication.
-     * Required for BASIC authentication to connect to the server.
-     */
-    private String password;
-
-    /**
-     * Authorization token to use for server authentication.
-     * Required for TOKEN authentication to connect to the server.
-     */
-    private String authorizationToken;
-
-    /**
-     * API Key ID to use for server authentication.
-     * Required for APIKEY authentication to connect to the server.
-     */
-    private String apiKeyId;
-
-    /**
-     * API Key Secret to use for server authentication.
-     * Required for APIKEY authentication to connect to the server.
-     */
-    private String apiKeySecret;
-
-    /**
-     * Timeout in milliseconds used for establishing the connection to the server.
-     * Optional. If not provided, defaults to 5000s.
-     *
-     */
-    private int connectTimeout;
-
-    /**
-     * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
-     * Optional. If not provided, defaults to 60000s.
-     */
-    private int socketTimeout;
-
-    /**
-     * Number of worker threads used by the connection manager.
-     * Optional. If not provided, defaults to 1.
-     */
-    private int ioThreadCount;
-
-    public ElasticsearchParameters(String host, int port) {
-        this.host = host;
-        this.port = port;
+  /**
+   * Elasticsearch server host.
+   */
+  private String host;
+
+  /**
+   * Elasticsearch server port.
+   */
+  private int port;
+
+  /**
+   * Elasticsearch server scheme.
+   * Optional. If not provided, defaults to http.
+   */
+  private String scheme;
+
+  /**
+   * Authentication type to connect to the server.
+   * Can be BASIC, TOKEN or APIKEY.
+   */
+  private AuthenticationType authenticationType;
+
+  /**
+   * Username to use for server authentication.
+   * Required for BASIC authentication to connect to the server.
+   */
+  private String username;
+
+  /**
+   * Password to use for server authentication.
+   * Required for BASIC authentication to connect to the server.
+   */
+  private String password;
+
+  /**
+   * Authorization token to use for server authentication.
+   * Required for TOKEN authentication to connect to the server.
+   */
+  private String authorizationToken;
+
+  /**
+   * API Key ID to use for server authentication.
+   * Required for APIKEY authentication to connect to the server.
+   */
+  private String apiKeyId;
+
+  /**
+   * API Key Secret to use for server authentication.
+   * Required for APIKEY authentication to connect to the server.
+   */
+  private String apiKeySecret;
+
+  /**
+   * Timeout in milliseconds used for establishing the connection to the server.
+   * Optional. If not provided, defaults to 5000s.
+   */
+  private int connectTimeout;
+
+  /**
+   * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
+   * Optional. If not provided, defaults to 60000s.
+   */
+  private int socketTimeout;
+
+  /**
+   * Number of worker threads used by the connection manager.
+   * Optional. If not provided, defaults to 1.
+   */
+  private int ioThreadCount;
+
+  public ElasticsearchParameters(String host, int port) {
+    this.host = host;
+    this.port = port;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public String getScheme() {
+    return scheme;
+  }
+
+  public void setScheme(String scheme) {
+    this.scheme = scheme;
+  }
+
+  public AuthenticationType getAuthenticationType() {
+    return authenticationType;
+  }
+
+  public void setAuthenticationType(AuthenticationType authenticationType) {
+    this.authenticationType = authenticationType;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getAuthorizationToken() {
+    return authorizationToken;
+  }
+
+  public void setAuthorizationToken(String authorizationToken) {
+    this.authorizationToken = authorizationToken;
+  }
+
+  public String getApiKeyId() {
+    return apiKeyId;
+  }
+
+  public void setApiKeyId(String apiKeyId) {
+    this.apiKeyId = apiKeyId;
+  }
+
+  public String getApiKeySecret() {
+    return apiKeySecret;
+  }
+
+  public void setApiKeySecret(String apiKeySecret) {
+    this.apiKeySecret = apiKeySecret;
+  }
+
+  public int getConnectTimeout() {
+    return connectTimeout;
+  }
+
+  public void setConnectTimeout(int connectTimeout) {
+    this.connectTimeout = connectTimeout;
+  }
+
+  public int getSocketTimeout() {
+    return socketTimeout;
+  }
+
+  public void setSocketTimeout(int socketTimeout) {
+    this.socketTimeout = socketTimeout;
+  }
+
+  public int getIoThreadCount() {
+    return ioThreadCount;
+  }
+
+  public void setIoThreadCount(int ioThreadCount) {
+    this.ioThreadCount = ioThreadCount;
+  }
+
+  /**
+   * Reads Elasticsearch parameters from a properties list.
+   *
+   * @param properties Properties list
+   * @return Elasticsearch parameters instance
+   */
+  public static ElasticsearchParameters load(Properties properties, Configuration conf) {
+    ElasticsearchParameters elasticsearchParameters;
+
+    if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
+      elasticsearchParameters = new ElasticsearchParameters(
+              conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+              conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
+    } else {
+      elasticsearchParameters = new ElasticsearchParameters(
+              properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+              Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
+                      String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
     }
 
-    public String getHost() {
-        return host;
+    String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
+    if (schemeProperty != null) {
+      elasticsearchParameters.setScheme(schemeProperty);
     }
 
-    public void setHost(String host) {
-        this.host = host;
+    AuthenticationType authenticationTypeProperty =
+            AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
+    if (authenticationTypeProperty != null) {
+      elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
     }
 
-    public int getPort() {
-        return port;
+    String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
+    if (usernameProperty != null) {
+      elasticsearchParameters.setUsername(usernameProperty);
     }
 
-    public void setPort(int port) {
-        this.port = port;
+    String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
+    if (passwordProperty != null) {
+      elasticsearchParameters.setPassword(passwordProperty);
     }
 
-    public String getScheme() {
-        return scheme;
+    String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
+    if (authorizationTokenProperty != null) {
+      elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
     }
 
-    public void setScheme(String scheme) {
-        this.scheme = scheme;
+    String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
+    if (apiKeyIdProperty != null) {
+      elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
     }
 
-    public AuthenticationType getAuthenticationType() {
-        return authenticationType;
+    String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
+    if (apiKeySecretProperty != null) {
+      elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
     }
 
-    public void setAuthenticationType(AuthenticationType authenticationType) {
-        this.authenticationType = authenticationType;
+    String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
+    if (connectTimeoutProperty != null) {
+      elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
     }
 
-    public String getUsername() {
-        return username;
+    String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
+    if (socketTimeoutProperty != null) {
+      elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
     }
 
-    public void setUsername(String username) {
-        this.username = username;
+    String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
+    if (ioThreadCountProperty != null) {
+      elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
     }
 
-    public String getPassword() {
-        return password;
-    }
-
-    public void setPassword(String password) {
-        this.password = password;
-    }
-
-    public String getAuthorizationToken() {
-        return authorizationToken;
-    }
-
-    public void setAuthorizationToken(String authorizationToken) {
-        this.authorizationToken = authorizationToken;
-    }
-
-    public String getApiKeyId() {
-        return apiKeyId;
-    }
-
-    public void setApiKeyId(String apiKeyId) {
-        this.apiKeyId = apiKeyId;
-    }
-
-    public String getApiKeySecret() {
-        return apiKeySecret;
-    }
-
-    public void setApiKeySecret(String apiKeySecret) {
-        this.apiKeySecret = apiKeySecret;
-    }
-
-    public int getConnectTimeout() {
-        return connectTimeout;
-    }
-
-    public void setConnectTimeout(int connectTimeout) {
-        this.connectTimeout = connectTimeout;
-    }
-
-    public int getSocketTimeout() {
-        return socketTimeout;
-    }
-
-    public void setSocketTimeout(int socketTimeout) {
-        this.socketTimeout = socketTimeout;
-    }
-
-    public int getIoThreadCount() {
-        return ioThreadCount;
-    }
-
-    public void setIoThreadCount(int ioThreadCount) {
-        this.ioThreadCount = ioThreadCount;
-    }
-
-    /**
-     * Reads Elasticsearch parameters from a properties list.
-     *
-     * @param properties Properties list
-     * @return Elasticsearch parameters instance
-     */
-    public static ElasticsearchParameters load(Properties properties, Configuration conf) {
-        ElasticsearchParameters elasticsearchParameters;
-
-        if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
-            elasticsearchParameters = new ElasticsearchParameters(
-                    conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
-                    conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
-        } else {
-            elasticsearchParameters = new ElasticsearchParameters(
-                    properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
-                    Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
-                            String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
-        }
-
-        String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
-        if (schemeProperty != null) {
-            elasticsearchParameters.setScheme(schemeProperty);
-        }
-
-        AuthenticationType authenticationTypeProperty =
-                AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
-        if (authenticationTypeProperty != null) {
-            elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
-        }
-
-        String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
-        if (usernameProperty != null) {
-            elasticsearchParameters.setUsername(usernameProperty);
-        }
-
-        String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
-        if (passwordProperty != null) {
-            elasticsearchParameters.setPassword(passwordProperty);
-        }
-
-        String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
-        if (authorizationTokenProperty != null) {
-            elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
-        }
-
-        String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
-        if (apiKeyIdProperty != null) {
-            elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
-        }
-
-        String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
-        if (apiKeySecretProperty != null) {
-            elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
-        }
-
-        String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
-        if (connectTimeoutProperty != null) {
-            elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
-        }
-
-        String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
-        if (socketTimeoutProperty != null) {
-            elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
-        }
-
-        String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
-        if (ioThreadCountProperty != null) {
-            elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
-        }
-
-        return elasticsearchParameters;
-    }
+    return elasticsearchParameters;
+  }
 }
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
index ef17dbc..6d4d4c3 100644
--- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
@@ -32,40 +32,40 @@ import java.util.Properties;
  */
 public class GoraElasticsearchTestDriver extends GoraTestDriver {
 
-    private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
-    private ElasticsearchContainer elasticsearchContainer;
+  private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
+  private ElasticsearchContainer elasticsearchContainer;
 
-    /**
-     * Constructor for this class.
-     */
-    public GoraElasticsearchTestDriver() {
-        super(ElasticsearchStore.class);
-        Properties properties = DataStoreFactory.createProps();
-        elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
-                .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
-                .withEnv("xpack.security.enabled", "true");
-    }
+  /**
+   * Constructor for this class.
+   */
+  public GoraElasticsearchTestDriver() {
+    super(ElasticsearchStore.class);
+    Properties properties = DataStoreFactory.createProps();
+    elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
+            .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
+            .withEnv("xpack.security.enabled", "true");
+  }
 
-    /**
-     * Initiate the Elasticsearch server on the default port.
-     */
-    @Override
-    public void setUpClass() throws Exception {
-        elasticsearchContainer.start();
-        log.info("Setting up Elasticsearch test driver");
+  /**
+   * Initiate the Elasticsearch server on the default port.
+   */
+  @Override
+  public void setUpClass() throws Exception {
+    elasticsearchContainer.start();
+    log.info("Setting up Elasticsearch test driver");
 
-        int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
-        String host = elasticsearchContainer.getContainerIpAddress();
-        conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
-        conf.set(ElasticsearchConstants.PROP_HOST, host);
-    }
+    int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
+    String host = elasticsearchContainer.getContainerIpAddress();
+    conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
+    conf.set(ElasticsearchConstants.PROP_HOST, host);
+  }
 
-    /**
-     * Tear the server down.
-     */
-    @Override
-    public void tearDownClass() throws Exception {
-        elasticsearchContainer.close();
-        log.info("Tearing down Elasticsearch test driver");
-    }
+  /**
+   * Tear the server down.
+   */
+  @Override
+  public void tearDownClass() throws Exception {
+    elasticsearchContainer.close();
+    log.info("Tearing down Elasticsearch test driver");
+  }
 }
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
index d5aa9e6..c05c441 100644
--- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
@@ -19,10 +19,12 @@ package org.apache.gora.elasticsearch.mapreduce;
 import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
 import org.apache.gora.examples.generated.WebPage;
 import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.mapreduce.MapReduceTestUtils;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 
@@ -31,29 +33,35 @@ import java.io.IOException;
  */
 public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase {
 
-    private GoraElasticsearchTestDriver driver;
-
-    public ElasticsearchStoreMapReduceTest() throws IOException {
-        super();
-        driver = new GoraElasticsearchTestDriver();
-    }
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        driver.setUpClass();
-        super.setUp();
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        driver.tearDownClass();
-    }
-
-    @Override
-    protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
-        return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
-    }
+  private GoraElasticsearchTestDriver driver;
+
+  public ElasticsearchStoreMapReduceTest() throws IOException {
+    super();
+    driver = new GoraElasticsearchTestDriver();
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    driver.setUpClass();
+    super.setUp();
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    driver.tearDownClass();
+  }
+
+  @Override
+  protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
+    return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
+  }
+
+  @Test
+  @Override
+  public void testCountQuery() throws Exception {
+    MapReduceTestUtils.testCountQuery(createWebPageDataStore(), driver.getConfiguration());
+  }
 }
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
index 7c95593..fb7fd0d 100644
--- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
@@ -34,140 +34,145 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
 
 /**
  * Test case for ElasticsearchStore.
  */
 public class TestElasticsearchStore extends DataStoreTestBase {
 
-    static {
-        setTestDriver(new GoraElasticsearchTestDriver());
-    }
-
-    @Test
-    public void testInitialize() throws GoraException {
-        log.info("test method: testInitialize");
-
-        ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
-
-        Map<String, Field> fields = new HashMap<String, Field>() {{
-            put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
-            put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
-            put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
-            put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
-            put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
-            put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
-            put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
-        }};
-
-        Assert.assertEquals("frontier", employeeStore.getSchemaName());
-        Assert.assertEquals("frontier", mapping.getIndexName());
-        Assert.assertEquals(fields, mapping.getFields());
-    }
-
-    @Test
-    public void testLoadElasticsearchParameters() throws IOException {
-        log.info("test method: testLoadElasticsearchParameters");
-
-        Properties properties = DataStoreFactory.createProps();
-
-        ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
-
-        Assert.assertEquals("localhost", parameters.getHost());
-        Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
-        Assert.assertEquals("elastic", parameters.getUsername());
-        Assert.assertEquals("password", parameters.getPassword());
-    }
-
-    @Test(expected = GoraException.class)
-    public void testInvalidXmlFile() throws Exception {
-        log.info("test method: testInvalidXmlFile");
-
-        Properties properties = DataStoreFactory.createProps();
-        properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
-        properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
-        testDriver.createDataStore(String.class, EmployeeInt.class, properties);
-    }
-
-    @Test
-    public void testXsdValidationParameter() throws GoraException {
-        log.info("test method: testXsdValidationParameter");
-
-        Properties properties = DataStoreFactory.createProps();
-        properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
-        properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
-        testDriver.createDataStore(String.class, EmployeeInt.class, properties);
-    }
-
-    @Test
-    public void testGetType() throws GoraException, ClassNotFoundException {
-        Configuration conf = testDriver.getConfiguration();
-        DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
-
-        String actualType = storeMetadataAnalyzer.getType();
-        String expectedType = "ELASTICSEARCH";
-        Assert.assertEquals(expectedType, actualType);
-    }
-
-    @Test
-    public void testGetTablesNames() throws GoraException, ClassNotFoundException {
-        Configuration conf = testDriver.getConfiguration();
-        DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
-
-        List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
-        List<String> expectedTablesNames = new ArrayList<String>() {
-            {
-                add("frontier");
-                add("webpage");
-            }
-        };
-        Assert.assertEquals(expectedTablesNames, actualTablesNames);
-    }
-
-    @Test
-    public void testGetTableInfo() throws GoraException, ClassNotFoundException {
-        Configuration conf = testDriver.getConfiguration();
-        DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
-
-        ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
-                (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
-
-        List<String> expectedDocumentKeys = new ArrayList<String>() {
-            {
-                add("name");
-                add("dateOfBirth");
-                add("ssn");
-                add("value");
-                add("salary");
-                add("boss");
-                add("webpage");
-                add("gora_id");
-            }
-        };
-
-        List<String> expectedDocumentTypes = new ArrayList<String>() {
-            {
-                add("text");
-                add("long");
-                add("text");
-                add("text");
-                add("integer");
-                add("object");
-                add("object");
-                add("keyword");
-            }
-        };
-
-        Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
-        Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
-
-        Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
-        Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
-    }
-
-    @Ignore("Elasticsearch doesn't support 3 types union field yet")
-    @Override
-    public void testGet3UnionField() {
-    }
+  static {
+    setTestDriver(new GoraElasticsearchTestDriver());
+  }
+
+  @Test
+  public void testInitialize() throws GoraException {
+    log.info("test method: testInitialize");
+
+    ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
+
+    Map<String, Field> fields = new HashMap<String, Field>() {{
+      put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
+      put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
+      put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
+      put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
+      put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
+      put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
+      put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
+    }};
+
+    Assert.assertEquals("frontier", employeeStore.getSchemaName());
+    Assert.assertEquals("frontier", mapping.getIndexName());
+    Assert.assertEquals(fields, mapping.getFields());
+  }
+
+  @Test
+  public void testLoadElasticsearchParameters() throws IOException {
+    log.info("test method: testLoadElasticsearchParameters");
+
+    Properties properties = DataStoreFactory.createProps();
+
+    ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
+
+    Assert.assertEquals("localhost", parameters.getHost());
+    Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
+    Assert.assertEquals("elastic", parameters.getUsername());
+    Assert.assertEquals("password", parameters.getPassword());
+  }
+
+  @Test(expected = GoraException.class)
+  public void testInvalidXmlFile() throws Exception {
+    log.info("test method: testInvalidXmlFile");
+
+    Properties properties = DataStoreFactory.createProps();
+    properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+    properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
+    testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+  }
+
+  @Test
+  public void testXsdValidationParameter() throws GoraException {
+    log.info("test method: testXsdValidationParameter");
+
+    Properties properties = DataStoreFactory.createProps();
+    properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+    properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
+    testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+  }
+
+  @Test
+  public void testGetType() throws GoraException, ClassNotFoundException {
+    Configuration conf = testDriver.getConfiguration();
+    DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+    String actualType = storeMetadataAnalyzer.getType();
+    String expectedType = "ELASTICSEARCH";
+    Assert.assertEquals(expectedType, actualType);
+  }
+
+  @Test
+  public void testGetTablesNames() throws GoraException, ClassNotFoundException {
+    Configuration conf = testDriver.getConfiguration();
+    DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+    List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
+    List<String> expectedTablesNames = new ArrayList<String>() {
+      {
+        add("frontier");
+        add("webpage");
+      }
+    };
+    Assert.assertEquals(expectedTablesNames, actualTablesNames);
+  }
+
+  @Test
+  public void testGetTableInfo() throws GoraException, ClassNotFoundException {
+    Configuration conf = testDriver.getConfiguration();
+    DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+    ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
+            (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
+
+    List<String> expectedDocumentKeys = new ArrayList<String>() {
+      {
+        add("name");
+        add("dateOfBirth");
+        add("ssn");
+        add("value");
+        add("salary");
+        add("boss");
+        add("webpage");
+        add("gora_id");
+      }
+    };
+
+    List<String> expectedDocumentTypes = new ArrayList<String>() {
+      {
+        add("text");
+        add("long");
+        add("text");
+        add("text");
+        add("integer");
+        add("object");
+        add("object");
+        add("keyword");
+      }
+    };
+
+    Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
+    Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
+
+    Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
+    Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
+  }
+
+  @Ignore("Elasticsearch doesn't support 3 types union field yet")
+  @Override
+  public void testGet3UnionField() {
+  }
 }