You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by dj...@apache.org on 2021/08/11 18:22:27 UTC
[gora] branch master updated: GORA-664 Add datastore for
Elasticsearch (#234)
This is an automated email from the ASF dual-hosted git repository.
djkevincr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push:
new 2d0d910 GORA-664 Add datastore for Elasticsearch (#234)
2d0d910 is described below
commit 2d0d91093dd7bceb07a4d64fb6b075bdc2117abd
Author: Maria Podorvanova <36...@users.noreply.github.com>
AuthorDate: Thu Aug 12 04:22:22 2021 +1000
GORA-664 Add datastore for Elasticsearch (#234)
* Create basic gora-elasticsearch module
* Bump Elasticsearch version and remove redundant dependency
* Implement connection and basic schema management
- Create ElasticsearchStore class with connection initialization
- Create basic Elasticsearch types mapping
- Implement the necessary files for mapping representation (ElasticsearchMapping, ElasticsearchMappingBuilder)
- Read schema from mapping file
- Cover initialization with test
* Set up Elasticsearch client parameters
- Created gora.properties file with configuration properties
- Loaded connection parameters from configuration
- Implemented connection to Elasticsearch cluster with ElasticsearchParameters
- Covered ElasticsearchParameters with tests
- Added javadoc descriptions
* Add a property for choosing the authentication method
* Implement testing with Elasticsearch container
- Added testing dependencies
- Added GoraElasticsearchTestDriver with Elasticsearch container
- Added javadoc descriptions to GoraElasticsearchTestDriver class
- Fixed two existing tests in accordance to Elasticsearch container
* Implement some methods for schema management
Implemented schemaExists, createSchema, deleteSchema and flush methods
* Add XSD validation file for the XML mapping
* Fix XSD validation
- Relocated gora-elasticsearch.xsd file to main resources
- Covered XSD validation with test
- Added gora-elasticsearch-mapping-invalid.xml file for test
* Set up Elasticsearch container's authentication parameters
* Implement exists method
* Add comments for the connection parameters
* Fix authentication
- Set up password to Elasticsearch container properly
- Set default Elasticsearch container server’s username in gora.properties
- Added exceptions for missing arguments in authentication
* Add parameter for the XSD validation
- Defined a parameter for the XSD validation
- Added a test case for the parameter
- Made ElasticsearchStore read mapping file from properties, not configuration
* Implement some basic Input-Output operations for schema management
- Implemented delete, get and put methods
- Implemented newInstance and getUnionSchema utility methods
- Implemented basic serialization/deserialization for primitive AVRO types
* Fix createSchema method
- Added mappings while creating an Elasticsearch index
- Added getter and setter to Datatype enum
* Implement serialization/deserialization for some Avro data types
- Implemented serializeFieldValue and deserializeFieldValue methods for ARRAY, BOOLEAN, BYTES and FIXED Avro data types
- Fixed deserialization for STRING Avro data type
- Added javadoc descriptions
* Fix NPE when getting a non-existent Elasticsearch document
* Implement serialization/deserialization for MAP Avro data type
* Refactor serialization/deserialization to have better javadocs and arguments
* Implement serialization/deserialization for RECORD Avro data type
* Implement serialization/deserialization for UNION Avro data type
* Fix passed Schema argument for ARRAY deserialization
* Fix BYTES deserialization for Base64 encoded String
* Ignore testGet3UnionField test
* Add javadoc descriptions to serialization and deserialization methods
* Implement newQuery method
* Implement deleteByQuery method
* Use an Enum instead of literal strings for the Authentication Type parameter
* Use parameterized logging instead of string concatenation
* Implement execute method
* Implement getPartitions method
* Add scaling_factor support
* Remove unsupported Elasticsearch data types
* Implement Metadata Analyzer for Elasticsearch Store
* Try to fix range query by “_id” field
* Fix execute method by adding a special "gora_id" field
* Implement deleting specific fields of the records in deleteByQuery method
* Implement MapReduce test
* Fix flush method by using refresh
* Address reviewer's comments
* Add Elasticsearch specific logging dependency
---
gora-elasticsearch/pom.xml | 177 +++++
.../mapping/ElasticsearchMapping.java | 73 ++
.../mapping/ElasticsearchMappingBuilder.java | 210 +++++
.../apache/gora/elasticsearch/mapping/Field.java | 200 +++++
.../gora/elasticsearch/mapping/package-info.java | 20 +
.../apache/gora/elasticsearch/package-info.java | 20 +
.../elasticsearch/query/ElasticsearchQuery.java | 41 +
.../elasticsearch/query/ElasticsearchResult.java | 72 ++
.../gora/elasticsearch/query/package-info.java | 20 +
.../elasticsearch/store/ElasticsearchStore.java | 864 +++++++++++++++++++++
.../ElasticsearchStoreCollectionMetadata.java | 53 ++
.../store/ElasticsearchStoreMetadataAnalyzer.java | 102 +++
.../gora/elasticsearch/store/package-info.java | 20 +
.../elasticsearch/utils/AuthenticationType.java | 35 +
.../utils/ElasticsearchConstants.java | 49 ++
.../utils/ElasticsearchParameters.java | 274 +++++++
.../gora/elasticsearch/utils/package-info.java | 20 +
.../src/main/resources/gora-elasticsearch.xsd | 61 ++
.../elasticsearch/GoraElasticsearchTestDriver.java | 71 ++
.../mapreduce/ElasticsearchStoreMapReduceTest.java | 59 ++
.../gora/elasticsearch/mapreduce/package-info.java | 20 +
.../apache/gora/elasticsearch/package-info.java | 20 +
.../store/TestElasticsearchStore.java | 173 +++++
.../gora/elasticsearch/store/package-info.java | 20 +
.../gora-elasticsearch-mapping-invalid.xml | 31 +
.../test/resources/gora-elasticsearch-mapping.xml | 48 ++
.../src/test/resources/gora.properties | 40 +
pom.xml | 10 +
28 files changed, 2803 insertions(+)
diff --git a/gora-elasticsearch/pom.xml b/gora-elasticsearch/pom.xml
new file mode 100644
index 0000000..28ba932
--- /dev/null
+++ b/gora-elasticsearch/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>gora-elasticsearch</artifactId>
+ <packaging>bundle</packaging>
+
+ <name>Apache Gora :: Elasticsearch</name>
+ <url>http://gora.apache.org</url>
+ <description>The Apache Gora open source framework provides an in-memory data model and
+ persistence for big data. Gora supports persisting to column stores, key value stores,
+ document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce
+ support.</description>
+ <inceptionYear>2010</inceptionYear>
+ <organization>
+ <name>The Apache Software Foundation</name>
+ <url>http://www.apache.org/</url>
+ </organization>
+ <issueManagement>
+ <system>JIRA</system>
+ <url>https://issues.apache.org/jira/browse/GORA</url>
+ </issueManagement>
+ <ciManagement>
+ <system>Jenkins</system>
+ <url>https://builds.apache.org/job/Gora-trunk/</url>
+ </ciManagement>
+
+ <properties>
+ <osgi.import>*</osgi.import>
+ <osgi.export>org.apache.gora.elasticsearch*;version="${project.version}";-noimport:=true</osgi.export>
+ </properties>
+
+ <build>
+ <directory>target</directory>
+ <outputDirectory>target/classes</outputDirectory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <testOutputDirectory>target/test-classes</testOutputDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testResources>
+ <testResource>
+ <directory>${project.basedir}/src/test/resources</directory>
+ <includes>
+ <include>**/*</include>
+ </includes>
+ <!--targetPath>${project.basedir}/target/classes/</targetPath -->
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>${build-helper-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>src/examples/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Gora Internal Dependencies -->
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.gora</groupId>
+ <artifactId>gora-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Elasticsearch dependencies -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jdom</groupId>
+ <artifactId>jdom</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
+ <!-- Logging Dependencies -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.jms</groupId>
+ <artifactId>jms</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-to-slf4j</artifactId>
+ <version>2.8.2</version>
+ </dependency>
+
+ <!-- Testing Dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
new file mode 100644
index 0000000..d31e64f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Mapping definitions for Elasticsearch.
+ */
+public class ElasticsearchMapping {
+
+ private String indexName;
+ private Map<String, Field> fields;
+
+ /**
+ * Empty constructor for the ElasticsearchMapping class.
+ */
+ public ElasticsearchMapping() {
+ fields = new HashMap<>();
+ }
+
+ /**
+ * Returns the name of Elasticsearch index linked to the mapping.
+ *
+ * @return Index's name
+ */
+ public String getIndexName() {
+ return indexName;
+ }
+
+ /**
+ * Sets the index name of the Elasticsearch mapping.
+ *
+ * @param indexName Index's name
+ */
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
+
+ /**
+ * Returns a map with all mapped fields.
+ *
+ * @return Map containing mapped fields
+ */
+ public Map<String, Field> getFields() {
+ return fields;
+ }
+
+ /**
+ * Add a new field to the mapped fields.
+ *
+ * @param classFieldName Field name in the persisted class
+ * @param field Mapped field from Elasticsearch index
+ */
+ public void addField(String classFieldName, Field field) {
+ fields.put(classFieldName, field);
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
new file mode 100644
index 0000000..30dba7b
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import com.google.inject.ConfigurationException;
+import org.apache.commons.io.IOUtils;
+import org.apache.gora.elasticsearch.store.ElasticsearchStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Builder for Mapping definitions of Elasticsearch.
+ */
+public class ElasticsearchMappingBuilder<K, T extends PersistentBase> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
+
+ /**
+ * XSD validation file for the XML mapping.
+ */
+ private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
+
+ // Index description
+ static final String ATT_NAME = "name";
+
+ static final String ATT_TYPE = "type";
+
+ // Class description
+ static final String TAG_CLASS = "class";
+
+ static final String ATT_KEYCLASS = "keyClass";
+
+ static final String ATT_INDEX = "index";
+
+ static final String TAG_FIELD = "field";
+
+ static final String ATT_DOCFIELD = "docfield";
+
+ static final String ATT_SCALINGFACTOR = "scalingFactor";
+
+ /**
+ * Mapping instance being built.
+ */
+ private ElasticsearchMapping elasticsearchMapping;
+
+ private final ElasticsearchStore<K, T> dataStore;
+
+ /**
+ * Constructor for ElasticsearchMappingBuilder.
+ *
+ * @param store ElasticsearchStore instance
+ */
+ public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
+ this.elasticsearchMapping = new ElasticsearchMapping();
+ this.dataStore = store;
+ }
+
+ /**
+ * Returns the Elasticsearch Mapping being built.
+ *
+ * @return Elasticsearch Mapping instance
+ */
+ public ElasticsearchMapping getElasticsearchMapping() {
+ return elasticsearchMapping;
+ }
+
+ /**
+ * Sets the Elasticsearch Mapping.
+ *
+ * @param elasticsearchMapping Elasticsearch Mapping instance
+ */
+ public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
+ this.elasticsearchMapping = elasticsearchMapping;
+ }
+
+ /**
+ * Reads Elasticsearch mappings from file.
+ *
+ * @param inputStream Mapping input stream
+ * @param xsdValidation Parameter for enabling XSD validation
+ */
+ public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
+ try {
+ SAXBuilder saxBuilder = new SAXBuilder();
+ if (inputStream == null) {
+ LOG.error("The mapping input stream is null!");
+ throw new GoraException("The mapping input stream is null!");
+ }
+
+ // Convert input stream to a string to use it a few times
+ String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
+
+ // XSD validation for XML file
+ if (xsdValidation) {
+ Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+ Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
+ .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
+ schema.newValidator().validate(xmlSource);
+ LOG.info("Mapping file is valid.");
+ }
+
+ Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+ if (document == null) {
+ LOG.error("The mapping document is null!");
+ throw new GoraException("The mapping document is null!");
+ }
+
+ Element root = document.getRootElement();
+ // Extract class descriptions
+ @SuppressWarnings("unchecked")
+ List<Element> classElements = root.getChildren(TAG_CLASS);
+ for (Element classElement : classElements) {
+ final Class<T> persistentClass = dataStore.getPersistentClass();
+ final Class<K> keyClass = dataStore.getKeyClass();
+ if (haveKeyClass(keyClass, classElement)
+ && havePersistentClass(persistentClass, classElement)) {
+ loadPersistentClass(classElement, persistentClass);
+ break;
+ }
+ }
+ } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
+ throw new RuntimeException(ex);
+ }
+ LOG.info("Gora Elasticsearch mapping file was read successfully.");
+ }
+
+ private boolean haveKeyClass(final Class<K> keyClass,
+ final Element classElement) {
+ return classElement.getAttributeValue(ATT_KEYCLASS).equals(
+ keyClass.getName());
+ }
+
+ private boolean havePersistentClass(final Class<T> persistentClass,
+ final Element classElement) {
+ return classElement.getAttributeValue(ATT_NAME).equals(
+ persistentClass.getName());
+ }
+
+ /**
+ * Handle the XML parsing of the class definition.
+ *
+ * @param classElement the XML node containing the class definition
+ */
+ protected void loadPersistentClass(Element classElement,
+ Class<T> pPersistentClass) {
+
+ String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
+ String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
+
+ elasticsearchMapping.setIndexName(indexName);
+ // docNameFromMapping could be null here
+ if (!indexName.equals(indexNameFromMapping)) {
+ ElasticsearchStore.LOG
+ .info("Keyclass and nameclass match, but mismatching index names. "
+ + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
+ indexNameFromMapping, indexName);
+ if (indexNameFromMapping != null) {
+ elasticsearchMapping.setIndexName(indexName);
+ }
+ }
+
+ // Process fields declaration
+ @SuppressWarnings("unchecked")
+ List<Element> fields = classElement.getChildren(TAG_FIELD);
+ for (Element fieldElement : fields) {
+ String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
+ Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
+ Field field;
+ if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
+ int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
+ field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
+ } else {
+ field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
+ }
+ elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
+ }
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
new file mode 100644
index 0000000..73016cb
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+/**
+ * Field definition for the Elasticsearch index.
+ */
+public class Field {
+
+ private String name;
+ private FieldType dataType;
+
+ /**
+ * Constructor for Field.
+ *
+ * @param name Field's name
+ * @param dataType Field's data type
+ */
+ public Field(String name, FieldType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ /**
+ * Returns the field's name.
+ *
+ * @return Field's name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the field's name.
+ *
+ * @param name Field's name
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Returns the field's data-type.
+ *
+ * @return Field's data-type
+ */
+ public FieldType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * Sets the field's data-type.
+ *
+ * @param dataType Field's data-type
+ */
+ public void setDataType(FieldType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Field field = (Field) o;
+ return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, dataType);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
+ .add("name='" + name + "'")
+ .add("dataType=" + dataType)
+ .toString();
+ }
+
+ /**
+ * Elasticsearch supported data-type enumeration. For a more detailed list of data
+ * types supported by Elasticsearch refer to
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
+ */
+ public enum DataType {
+ BINARY,
+ BOOLEAN,
+ KEYWORD,
+ CONSTANT_KEYWORD,
+ WILDCARD,
+ LONG,
+ INTEGER,
+ SHORT,
+ BYTE,
+ DOUBLE,
+ FLOAT,
+ HALF_FLOAT,
+ SCALED_FLOAT,
+ OBJECT,
+ FLATTENED,
+ NESTED,
+ TEXT,
+ COMPLETION,
+ SEARCH_AS_YOU_TYPE,
+ TOKEN_COUNT
+ }
+
+ public static class FieldType {
+
+ private DataType type;
+
+ // Parameter for scaled_float type.
+ private int scalingFactor;
+
+ /**
+ * Constructor for FieldType.
+ *
+ * @param type Elasticsearch data type
+ */
+ public FieldType(DataType type) {
+ this.type = type;
+ }
+
+ /**
+ * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
+ * with scaling factor parameter.
+ *
+ * @param scalingFactor scaled_float field's scaling factor
+ */
+ public FieldType(int scalingFactor) {
+ this.type = DataType.SCALED_FLOAT;
+ this.scalingFactor = scalingFactor;
+ }
+
+ /**
+ *
+ * @return Elasticsearch data type
+ */
+ public DataType getType() {
+ return type;
+ }
+
+ /**
+ *
+ * @param type Elasticsearch data type
+ */
+ public void setType(DataType type) {
+ this.type = type;
+ }
+
+ /**
+ * Returns the scaling factor of scaled_float type.
+ *
+ * @return scaled_float field's scaling factor
+ */
+ public int getScalingFactor() {
+ return scalingFactor;
+ }
+
+ /**
+ * Sets the scaling factor of scaled_float type.
+ *
+ * @param scalingFactor scaled_float field's scaling factor
+ */
+ public void setScalingFactor(int scalingFactor) {
+ this.scalingFactor = scalingFactor;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FieldType fieldType = (FieldType) o;
+ return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, scalingFactor);
+ }
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java
new file mode 100644
index 0000000..c5dccd1
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Mapping related classes.
+ */
+package org.apache.gora.elasticsearch.mapping;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java
new file mode 100644
index 0000000..8d2313a
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all Elasticsearch datastore related classes.
+ */
+package org.apache.gora.elasticsearch;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
new file mode 100644
index 0000000..f491006
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Elasticsearch specific implementation of the {@link Query} interface.
+ */
+public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
+
+ /**
+ * Constructor for the query.
+ *
+ * @param dataStore data store used
+ */
+ public ElasticsearchQuery(DataStore<K, T> dataStore) {
+ super(dataStore);
+ }
+
+ public ElasticsearchQuery() {
+ super(null);
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
new file mode 100644
index 0000000..e7f8180
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+import java.util.List;
+
+/**
+ * ElasticsearchResult specific implementation of the
+ * {@link org.apache.gora.query.Result} interface.
+ */
+public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> {
+
+ /**
+ * List of resulting persistent objects.
+ */
+ private List<T> persistentObjects;
+
+ /**
+ * List of resulting objects keys.
+ */
+ private List<K> persistentKeys;
+
+ public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
+ super(dataStore, query);
+ this.persistentKeys = persistentKeys;
+ this.persistentObjects = persistentObjects;
+ }
+
+ @Override
+ public float getProgress() {
+ if (persistentObjects.size() == 0) {
+ return 1;
+ }
+
+ return offset / (float) persistentObjects.size();
+ }
+
+ @Override
+ public int size() {
+ return persistentObjects.size();
+ }
+
+ @Override
+ protected boolean nextInner() {
+ if ((int) offset == persistentObjects.size()) {
+ return false;
+ }
+
+ persistent = persistentObjects.get((int) offset);
+ key = persistentKeys.get((int) offset);
+ return persistent != null;
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java
new file mode 100644
index 0000000..ad7b9d7
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Query related classes.
+ */
+package org.apache.gora.elasticsearch.query;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
new file mode 100644
index 0000000..a82de7f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
@@ -0,0 +1,864 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMappingBuilder;
+import org.apache.gora.elasticsearch.mapping.Field;
+import org.apache.gora.elasticsearch.query.ElasticsearchQuery;
+import org.apache.gora.elasticsearch.query.ElasticsearchResult;
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHeader;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+
+/**
+ * Implementation of a Apache Elasticsearch data store to be used by Apache Gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
+ private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
+ public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
+ private static final String XML_MAPPING_DEFINITION = "gora.mapping";
+ public static final String XSD_VALIDATION = "gora.xsd_validation";
+
+ /**
+ * Elasticsearch client
+ */
+ private RestHighLevelClient client;
+
+ /**
+ * Mapping definition for Elasticsearch
+ */
+ private ElasticsearchMapping elasticsearchMapping;
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
+ try {
+ LOG.debug("Initializing Elasticsearch store");
+ ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+ super.initialize(keyClass, persistentClass, properties);
+ ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
+ InputStream mappingStream;
+ if (properties.containsKey(XML_MAPPING_DEFINITION)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
+ }
+ mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
+ } else {
+ mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
+ }
+ String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
+ builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
+ elasticsearchMapping = builder.getElasticsearchMapping();
+ client = createClient(parameters);
+ LOG.info("Elasticsearch store was successfully initialized.");
+ } catch (Exception ex) {
+ LOG.error("Error while initializing Elasticsearch store", ex);
+ throw new GoraException(ex);
+ }
+ }
+
+ public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
+ RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
+
+ // Choosing the authentication method.
+ switch (parameters.getAuthenticationType()) {
+ case BASIC:
+ if (parameters.getUsername() != null && parameters.getPassword() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
+ clientBuilder.setHttpClientConfigCallback(
+ httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ } else {
+ throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
+ }
+ break;
+ case TOKEN:
+ if (parameters.getAuthorizationToken() != null) {
+ Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
+ parameters.getAuthorizationToken())};
+ clientBuilder.setDefaultHeaders(defaultHeaders);
+ } else {
+ throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
+ }
+ break;
+ case APIKEY:
+ if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
+ String apiKeyAuth = Base64.getEncoder()
+ .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
+ .getBytes(StandardCharsets.UTF_8));
+ Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
+ clientBuilder.setDefaultHeaders(defaultHeaders);
+ } else {
+ throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
+ }
+ break;
+ }
+
+ if (parameters.getConnectTimeout() != 0) {
+ clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+ requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
+ }
+
+ if (parameters.getSocketTimeout() != 0) {
+ clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+ requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
+ }
+
+ if (parameters.getIoThreadCount() != 0) {
+ clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
+ httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
+ .setIoThreadCount(parameters.getIoThreadCount()).build()));
+ }
+ return new RestHighLevelClient(clientBuilder);
+ }
+
+ public ElasticsearchMapping getMapping() {
+ return elasticsearchMapping;
+ }
+
+ @Override
+ public String getSchemaName() {
+ return elasticsearchMapping.getIndexName();
+ }
+
+ @Override
+ public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
+ return super.getSchemaName(mappingSchemaName, persistentClass);
+ }
+
+ @Override
+ public void createSchema() throws GoraException {
+ CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
+ Map<String, Object> properties = new HashMap<>();
+ for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
+ Map<String, Object> fieldType = new HashMap<>();
+ fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
+ if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
+ fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
+ }
+ properties.put(entry.getKey(), fieldType);
+ }
+ // Special field for range query
+ properties.put("gora_id", new HashMap<String, Object>() {{
+ put("type", "keyword");
+ }});
+ Map<String, Object> mapping = new HashMap<>();
+ mapping.put("properties", properties);
+ request.mapping(mapping);
+ try {
+ if (!client.indices().exists(
+ new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+ client.indices().create(request, RequestOptions.DEFAULT);
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public void deleteSchema() throws GoraException {
+ DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
+ try {
+ if (client.indices().exists(
+ new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+ client.indices().delete(request, RequestOptions.DEFAULT);
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public boolean schemaExists() throws GoraException {
+ try {
+ return client.indices().exists(
+ new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public boolean exists(K key) throws GoraException {
+ GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+ getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
+ try {
+ return client.exists(getRequest, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public T get(K key, String[] fields) throws GoraException {
+ String[] requestedFields = getFieldsToQuery(fields);
+ List<String> documentFields = new ArrayList<>();
+ for (String requestedField : requestedFields) {
+ documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
+ }
+ try {
+ // Prepare the Elasticsearch request
+ GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+ GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+ if (getResponse.isExists()) {
+ Map<String, Object> sourceMap = getResponse.getSourceAsMap();
+
+ // Map of field's name and its value from the Document
+ Map<String, Object> fieldsAndValues = new HashMap<>();
+ for (String field : documentFields) {
+ fieldsAndValues.put(field, sourceMap.get(field));
+ }
+
+ // Build the corresponding persistent
+ return newInstance(fieldsAndValues, requestedFields);
+ } else {
+ return null;
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public void put(K key, T obj) throws GoraException {
+ if (obj.isDirty()) {
+ Schema schemaObj = obj.getSchema();
+ List<Schema.Field> fields = schemaObj.getFields();
+ Map<String, Object> jsonMap = new HashMap<>();
+ for (Schema.Field field : fields) {
+ Field mappedField = elasticsearchMapping.getFields().get(field.name());
+ if (mappedField != null) {
+ Object fieldValue = obj.get(field.pos());
+ if (fieldValue != null) {
+ Schema fieldSchema = field.schema();
+ Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
+ jsonMap.put(mappedField.getName(), serializedObj);
+ }
+ }
+ }
+ // Special field for range query
+ jsonMap.put("gora_id", key);
+ // Prepare the Elasticsearch request
+ IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
+ try {
+ client.index(request, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ } else {
+ LOG.info("Ignored putting object {} in the store as it is neither "
+ + "new, neither dirty.", new Object[]{obj});
+ }
+ }
+
+ @Override
+ public boolean delete(K key) throws GoraException {
+ DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
+ try {
+ DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
+ return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) throws GoraException {
+ try {
+ BulkByScrollResponse bulkResponse;
+ if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
+ UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
+ QueryBuilder matchDocumentsWithinRange = QueryBuilders
+ .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+ updateRequest.setQuery(matchDocumentsWithinRange);
+
+ // Create a script for deleting fields
+ StringBuilder toDelete = new StringBuilder();
+ String[] fieldsToDelete = query.getFields();
+ for (String field : fieldsToDelete) {
+ String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
+ toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField));
+ }
+ //toDelete.deleteCharAt(toDelete.length() - 1);
+ updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
+ bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
+ return bulkResponse.getUpdated();
+ } else {
+ DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
+ QueryBuilder matchDocumentsWithinRange = QueryBuilders
+ .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+ deleteRequest.setQuery(matchDocumentsWithinRange);
+ bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
+ return bulkResponse.getDeleted();
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public Result<K, T> execute(Query<K, T> query) throws GoraException {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+ // Set the query result limit
+ int size = (int) query.getLimit();
+ if (size != -1) {
+ searchSourceBuilder.size(size);
+ }
+ try {
+ // Build the actual Elasticsearch range query
+ QueryBuilder rangeQueryBuilder = QueryBuilders
+ .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
+ searchSourceBuilder.query(rangeQueryBuilder);
+ SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
+ searchRequest.source(searchSourceBuilder);
+ SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+
+ String[] avroFields = getFieldsToQuery(query.getFields());
+
+ SearchHits hits = searchResponse.getHits();
+ SearchHit[] searchHits = hits.getHits();
+ List<K> hitId = new ArrayList<>();
+
+ // Check filter
+ Filter<K, T> queryFilter = query.getFilter();
+ List<T> filteredObjects = new ArrayList<>();
+ for (SearchHit hit : searchHits) {
+ Map<String, Object> sourceAsMap = hit.getSourceAsMap();
+ if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
+ filteredObjects.add(newInstance(sourceAsMap, avroFields));
+ hitId.add((K) hit.getId());
+ }
+ }
+ return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
+ query.setFields(getFieldsToQuery(null));
+ return query;
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
+ List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+ PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+ query);
+ partitionQuery.setConf(getConf());
+ partitions.add(partitionQuery);
+ return partitions;
+ }
+
+ @Override
+ public void flush() throws GoraException {
+ try {
+ client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+ client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.close();
+ LOG.info("Elasticsearch datastore destroyed successfully.");
+ } catch (IOException ex) {
+ LOG.error(ex.getMessage(), ex);
+ }
+ }
+
+ /**
+ * Build a new instance of the persisted class from the Document retrieved from the database.
+ *
+ * @param fieldsAndValues Map of field's name and its value from the Document
+ * that results from the query to the database
+ * @param requestedFields the list of fields to be mapped to the persistence class instance
+ * @return a persistence class instance which content was deserialized from the Document
+ * @throws IOException
+ */
+ public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
+ // Create new empty persistent bean instance
+ T persistent = newPersistent();
+
+ requestedFields = getFieldsToQuery(requestedFields);
+ // Populate each field
+ for (String objField : requestedFields) {
+ Schema.Field field = fieldMap.get(objField);
+ Schema fieldSchema = field.schema();
+ String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
+ Object fieldValue = fieldsAndValues.get(docFieldName);
+
+ Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
+ persistent.put(field.pos(), result);
+ }
+ persistent.clearDirty();
+ return persistent;
+ }
+
+ /**
+ * Deserialize an Elasticsearch object to a persistent Avro object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchValue Elasticsearch field value to be deserialized
+ * @return deserialized Avro object from the Elasticsearch object
+ * @throws GoraException when the given Elasticsearch value cannot be deserialized
+ */
+ private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
+ Object elasticsearchValue) throws GoraException {
+ Object fieldValue;
+ switch (avroFieldSchema.getType()) {
+ case MAP:
+ fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
+ break;
+ case RECORD:
+ fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
+ break;
+ case ARRAY:
+ fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
+ break;
+ case BOOLEAN:
+ fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
+ break;
+ case BYTES:
+ fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
+ break;
+ case FIXED:
+ case NULL:
+ fieldValue = null;
+ break;
+ case UNION:
+ fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
+ break;
+ case DOUBLE:
+ fieldValue = Double.parseDouble(elasticsearchValue.toString());
+ break;
+ case ENUM:
+ fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
+ break;
+ case FLOAT:
+ fieldValue = Float.parseFloat(elasticsearchValue.toString());
+ break;
+ case INT:
+ fieldValue = Integer.parseInt(elasticsearchValue.toString());
+ break;
+ case LONG:
+ fieldValue = Long.parseLong(elasticsearchValue.toString());
+ break;
+ case STRING:
+ fieldValue = new Utf8(elasticsearchValue.toString());
+ break;
+ default:
+ fieldValue = elasticsearchValue;
+ }
+ return fieldValue;
+ }
+
+ /**
+ * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchValue Elasticsearch field value to be deserialized
+ * @return deserialized Avro List from the given Elasticsearch value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
+ Object elasticsearchValue) throws GoraException {
+ List<Object> list = new ArrayList<>();
+ if (elasticsearchValue != null) {
+ for (Object item : (List<Object>) elasticsearchValue) {
+ Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
+ list.add(result);
+ }
+ }
+ return new DirtyListWrapper<>(list);
+ }
+
+ /**
+ * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchMap Elasticsearch Map value to be deserialized
+ * @return deserialized Avro Map from the given Elasticsearch Map value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
+ Map<String, Object> elasticsearchMap) throws GoraException {
+ Map<Utf8, Object> deserializedMap = new HashMap<>();
+ if (elasticsearchMap != null) {
+ for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
+ String mapKey = entry.getKey();
+ Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
+ deserializedMap.put(new Utf8(mapKey), mapValue);
+ }
+ }
+ return new DirtyMapWrapper<>(deserializedMap);
+ }
+
+ /**
+ * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchRecord Elasticsearch Record value to be deserialized
+ * @return deserialized Avro Object from the given Elasticsearch Record value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchRecord(Schema avroFieldSchema,
+ Map<String, Object> elasticsearchRecord) throws GoraException {
+ Class<?> clazz;
+ try {
+ clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
+ } catch (ClassNotFoundException ex) {
+ throw new GoraException(ex);
+ }
+ PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
+ for (Schema.Field recField : avroFieldSchema.getFields()) {
+ Schema innerSchema = recField.schema();
+ Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
+ record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
+ }
+ return record;
+ }
+
+ /**
+ * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchUnion Elasticsearch Union value to be deserialized
+ * @return deserialized Avro Object from the given Elasticsearch Union value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
+ Object deserializedUnion;
+ Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+ Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+ if (avroFieldSchema.getTypes().size() == 2 &&
+ (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+ !type0.equals(type1)) {
+ int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
+ } else if (avroFieldSchema.getTypes().size() == 3) {
+ Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+ if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+ (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+ if (elasticsearchUnion == null) {
+ deserializedUnion = null;
+ } else if (elasticsearchUnion instanceof String) {
+ throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
+ } else {
+ int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ return deserializedUnion;
+ }
+
+ /**
+ * Serialize a persistent Avro object as used in Gora generated classes to
+ * an object that can be written into Elasticsearch.
+ *
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param avroFieldValue persistent Avro field value to be serialized
+ * @return serialized field value
+ * @throws GoraException when the given Avro object cannot be serialized
+ */
+ private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
+ Object output = avroFieldValue;
+ switch (avroFieldSchema.getType()) {
+ case ARRAY:
+ output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
+ break;
+ case MAP:
+ output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
+ break;
+ case RECORD:
+ output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
+ break;
+ case BYTES:
+ output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
+ break;
+ case UNION:
+ output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
+ break;
+ case BOOLEAN:
+ case DOUBLE:
+ case ENUM:
+ case FLOAT:
+ case INT:
+ case LONG:
+ case STRING:
+ output = avroFieldValue.toString();
+ break;
+ case FIXED:
+ break;
+ case NULL:
+ output = null;
+ break;
+ }
+ return output;
+ }
+
+ /**
+ * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
+ * List that can safely be written into Elasticsearch.
+ *
+ * @param collection the collection to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a List version of the collection that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
+ List<Object> list = new ArrayList<>();
+ for (Object item : collection) {
+ Object result = serializeFieldValue(avroFieldSchema, item);
+ list.add(result);
+ }
+ return list;
+ }
+
+ /**
+ * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
+ * map that can safely be written into Elasticsearch.
+ *
+ * @param map the map to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a Map version of the Java map that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
+ Map<CharSequence, Object> serializedMap = new HashMap<>();
+ for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
+ String mapKey = entry.getKey().toString();
+ Object mapValue = entry.getValue();
+ Object result = serializeFieldValue(avroFieldSchema, mapValue);
+ serializedMap.put(mapKey, result);
+ }
+ return serializedMap;
+ }
+
+ /**
+ * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+ * record that can safely be written into Elasticsearch.
+ *
+ * @param record the object to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a record version of the Java object that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
+ Map<CharSequence, Object> serializedRecord = new HashMap<>();
+ for (Schema.Field member : avroFieldSchema.getFields()) {
+ Object innerValue = ((PersistentBase) record).get(member.pos());
+ serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
+ }
+ return serializedRecord;
+ }
+
+ /**
+ * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+ * object that can safely be written into Elasticsearch.
+ *
+ * @param union the object to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a object version of the Java object that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
+ Object serializedUnion;
+ Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+ Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+ if (avroFieldSchema.getTypes().size() == 2 &&
+ (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+ !type0.equals(type1)) {
+ int schemaPos = getUnionSchema(union, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ serializedUnion = serializeFieldValue(unionSchema, union);
+ } else if (avroFieldSchema.getTypes().size() == 3) {
+ Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+ if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+ (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+ if (union == null) {
+ serializedUnion = null;
+ } else if (union instanceof String) {
+ throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
+ } else {
+ int schemaPos = getUnionSchema(union, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ serializedUnion = recordToElasticsearch(union, unionSchema);
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ return serializedUnion;
+ }
+
+ /**
+ * Method to retrieve the corresponding schema type index of a particular
+ * object having UNION schema. As UNION type can have one or more types and at
+ * a given instance, it holds an object of only one type of the defined types,
+ * this method is used to figure out the corresponding instance's schema type
+ * index.
+ *
+ * @param instanceValue value that the object holds
+ * @param unionSchema union schema containing all of the data types
+ * @return the unionSchemaPosition corresponding schema position
+ */
+ private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+ int unionSchemaPos = 0;
+ for (Schema currentSchema : unionSchema.getTypes()) {
+ Schema.Type schemaType = currentSchema.getType();
+ if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
+ return unionSchemaPos;
+ }
+ unionSchemaPos++;
+ }
+ return 0;
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
new file mode 100644
index 0000000..01466ee
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class with the collection info returned by ElasticsearchStoreMetadataAnalyzer with the information
+ * about an ElasticsearchStore collection.
+ */
+public class ElasticsearchStoreCollectionMetadata {
+
+ /**
+ * Collection document keys present in a given collection at ElasticsearchStore.
+ */
+ private List<String> documentKeys = new ArrayList<>();
+
+ /**
+ * Collection document types present in a given collection at ElasticsearchStore.
+ */
+ private List<String> documentTypes = new ArrayList<>();
+
+ public List<String> getDocumentKeys() {
+ return documentKeys;
+ }
+
+ public void setDocumentKeys(List<String> documentKeys) {
+ this.documentKeys = documentKeys;
+ }
+
+ public List<String> getDocumentTypes() {
+ return documentTypes;
+ }
+
+ public void setDocumentTypes(List<String> documentTypes) {
+ this.documentTypes = documentTypes;
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
new file mode 100644
index 0000000..6806aa3
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.store.impl.DataStoreMetadataAnalyzer;
+import org.apache.gora.util.GoraException;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetIndexResponse;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
+
+ private RestHighLevelClient elasticsearchClient;
+
+ @Override
+ public void initialize() throws GoraException {
+ ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+ elasticsearchClient = ElasticsearchStore.createClient(parameters);
+ }
+
+ @Override
+ public String getType() {
+ return "ELASTICSEARCH";
+ }
+
+ @Override
+ public List<String> getTablesNames() throws GoraException {
+ GetIndexRequest request = new GetIndexRequest("*");
+ GetIndexResponse response;
+ try {
+ response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ if (response == null) {
+ LOG.error("Could not find indices.");
+ throw new GoraException("Could not find indices.");
+ }
+ return Arrays.asList(response.getIndices());
+ }
+
+ @Override
+ public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
+ GetIndexRequest request = new GetIndexRequest(tableName);
+ GetIndexResponse getIndexResponse;
+ try {
+ getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
+ Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
+
+ List<String> documentTypes = new ArrayList<>();
+ List<String> documentKeys = new ArrayList<>();
+
+ for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
+ Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
+ documentTypes.add((String) subEntry.get("type"));
+ documentKeys.add(entry.getKey());
+ }
+
+ ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
+ collectionMetadata.setDocumentKeys(documentKeys);
+ collectionMetadata.setDocumentTypes(documentTypes);
+ return collectionMetadata;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (elasticsearchClient != null) {
+ this.elasticsearchClient.close();
+ }
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java
new file mode 100644
index 0000000..66f575f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains the core classes of the Elasticsearch datastore.
+ */
+package org.apache.gora.elasticsearch.store;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
new file mode 100644
index 0000000..dbbd45b
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+/**
+ * Authentication type to connect to the Elasticsearch server.
+ */
+public enum AuthenticationType {
+ /**
+ * Basic authentication requires to provide a username and password.
+ */
+ BASIC,
+ /**
+ * Token authentication requires to provide an Elasticsearch access token.
+ */
+ TOKEN,
+ /**
+ * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
+ */
+ APIKEY
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
new file mode 100644
index 0000000..fa9bdc2
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+/**
+ * Constants file for Elasticsearch.
+ */
+public class ElasticsearchConstants {
+ /**
+ * Property indicating if the hadoop configuration has priority or not.
+ */
+ public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
+
+ /**
+ * Default configurations for Elasticsearch.
+ */
+ public static final String DEFAULT_HOST = "localhost";
+ public static final int DEFAULT_PORT = 9200;
+
+ /**
+ * List of keys used in the configuration file of Elasticsearch.
+ */
+ public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
+ public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
+ public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
+ public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
+ public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
+ public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
+ public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
+ public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
+ public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
+ public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
+ public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
+ public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
new file mode 100644
index 0000000..4b0cef4
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+/**
+ * Parameters definitions for Elasticsearch.
+ */
+public class ElasticsearchParameters {
+
+ /**
+ * Elasticsearch server host.
+ */
+ private String host;
+
+ /**
+ * Elasticsearch server port.
+ */
+ private int port;
+
+ /**
+ * Elasticsearch server scheme.
+ * Optional. If not provided, defaults to http.
+ */
+ private String scheme;
+
+ /**
+ * Authentication type to connect to the server.
+ * Can be BASIC, TOKEN or APIKEY.
+ */
+ private AuthenticationType authenticationType;
+
+ /**
+ * Username to use for server authentication.
+ * Required for BASIC authentication to connect to the server.
+ */
+ private String username;
+
+ /**
+ * Password to use for server authentication.
+ * Required for BASIC authentication to connect to the server.
+ */
+ private String password;
+
+ /**
+ * Authorization token to use for server authentication.
+ * Required for TOKEN authentication to connect to the server.
+ */
+ private String authorizationToken;
+
+ /**
+ * API Key ID to use for server authentication.
+ * Required for APIKEY authentication to connect to the server.
+ */
+ private String apiKeyId;
+
+ /**
+ * API Key Secret to use for server authentication.
+ * Required for APIKEY authentication to connect to the server.
+ */
+ private String apiKeySecret;
+
+ /**
+ * Timeout in milliseconds used for establishing the connection to the server.
+ * Optional. If not provided, defaults to 5000s.
+ *
+ */
+ private int connectTimeout;
+
+ /**
+ * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
+ * Optional. If not provided, defaults to 60000s.
+ */
+ private int socketTimeout;
+
+ /**
+ * Number of worker threads used by the connection manager.
+ * Optional. If not provided, defaults to 1.
+ */
+ private int ioThreadCount;
+
+ public ElasticsearchParameters(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public AuthenticationType getAuthenticationType() {
+ return authenticationType;
+ }
+
+ public void setAuthenticationType(AuthenticationType authenticationType) {
+ this.authenticationType = authenticationType;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getAuthorizationToken() {
+ return authorizationToken;
+ }
+
+ public void setAuthorizationToken(String authorizationToken) {
+ this.authorizationToken = authorizationToken;
+ }
+
+ public String getApiKeyId() {
+ return apiKeyId;
+ }
+
+ public void setApiKeyId(String apiKeyId) {
+ this.apiKeyId = apiKeyId;
+ }
+
+ public String getApiKeySecret() {
+ return apiKeySecret;
+ }
+
+ public void setApiKeySecret(String apiKeySecret) {
+ this.apiKeySecret = apiKeySecret;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ public int getIoThreadCount() {
+ return ioThreadCount;
+ }
+
+ public void setIoThreadCount(int ioThreadCount) {
+ this.ioThreadCount = ioThreadCount;
+ }
+
+ /**
+ * Reads Elasticsearch parameters from a properties list.
+ *
+ * @param properties Properties list
+ * @return Elasticsearch parameters instance
+ */
+ public static ElasticsearchParameters load(Properties properties, Configuration conf) {
+ ElasticsearchParameters elasticsearchParameters;
+
+ if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
+ elasticsearchParameters = new ElasticsearchParameters(
+ conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+ conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
+ } else {
+ elasticsearchParameters = new ElasticsearchParameters(
+ properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+ Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
+ String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
+ }
+
+ String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
+ if (schemeProperty != null) {
+ elasticsearchParameters.setScheme(schemeProperty);
+ }
+
+ AuthenticationType authenticationTypeProperty =
+ AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
+ if (authenticationTypeProperty != null) {
+ elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
+ }
+
+ String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
+ if (usernameProperty != null) {
+ elasticsearchParameters.setUsername(usernameProperty);
+ }
+
+ String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
+ if (passwordProperty != null) {
+ elasticsearchParameters.setPassword(passwordProperty);
+ }
+
+ String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
+ if (authorizationTokenProperty != null) {
+ elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
+ }
+
+ String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
+ if (apiKeyIdProperty != null) {
+ elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
+ }
+
+ String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
+ if (apiKeySecretProperty != null) {
+ elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
+ }
+
+ String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
+ if (connectTimeoutProperty != null) {
+ elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
+ }
+
+ String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
+ if (socketTimeoutProperty != null) {
+ elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
+ }
+
+ String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
+ if (ioThreadCountProperty != null) {
+ elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
+ }
+
+ return elasticsearchParameters;
+ }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java
new file mode 100644
index 0000000..49fcb93
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains utility classes.
+ */
+package org.apache.gora.elasticsearch.utils;
diff --git a/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd
new file mode 100644
index 0000000..0c7c011
--- /dev/null
+++ b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <xs:element name="gora-otd" type="elasticsearch-mapping"/>
+
+ <xs:complexType name="elasticsearch-mapping">
+ <xs:sequence>
+ <xs:element name="class" type="class-mapping" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="class-mapping">
+ <xs:sequence>
+ <xs:element name="field" type="field-mapping" maxOccurs="unbounded"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="nameClass-types" use="required"/>
+ <xs:attribute name="keyClass" type="keyClass-types" use="required"/>
+ <xs:attribute name="index" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="field-mapping">
+ <xs:attribute name="name" type="fieldName-types" use="required"/>
+ <xs:attribute name="docfield" type="xs:string" use="required"/>
+ <xs:attribute name="type" type="xs:string" use="required"/>
+ <xs:attribute name="scalingFactor" type="xs:string" use="optional"/>
+ </xs:complexType>
+
+ <xs:simpleType name="keyClass-types">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="java.lang.String"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:simpleType name="nameClass-types">
+ <xs:restriction base="xs:string">
+ <xs:pattern value="([\p{L}_$][\p{L}\p{N}_$]*\.)*[\p{L}_$][\p{L}\p{N}_$]*"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:simpleType name="fieldName-types">
+ <xs:restriction base="xs:string">
+ <xs:pattern value="[a-zA-Z][a-zA-Z0-9]*"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+</xs:schema>
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
new file mode 100644
index 0000000..ef17dbc
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch;
+
+import org.apache.gora.GoraTestDriver;
+import org.apache.gora.elasticsearch.store.ElasticsearchStore;
+import org.apache.gora.elasticsearch.utils.ElasticsearchConstants;
+import org.apache.gora.store.DataStoreFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.util.Properties;
+
+/**
+ * Helper class for third part tests using gora-elasticsearch backend.
+ *
+ * @see GoraTestDriver
+ */
+public class GoraElasticsearchTestDriver extends GoraTestDriver {
+
+ private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
+ private ElasticsearchContainer elasticsearchContainer;
+
+ /**
+ * Constructor for this class.
+ */
+ public GoraElasticsearchTestDriver() {
+ super(ElasticsearchStore.class);
+ Properties properties = DataStoreFactory.createProps();
+ elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
+ .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
+ .withEnv("xpack.security.enabled", "true");
+ }
+
+ /**
+ * Initiate the Elasticsearch server on the default port.
+ */
+ @Override
+ public void setUpClass() throws Exception {
+ elasticsearchContainer.start();
+ log.info("Setting up Elasticsearch test driver");
+
+ int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
+ String host = elasticsearchContainer.getContainerIpAddress();
+ conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
+ conf.set(ElasticsearchConstants.PROP_HOST, host);
+ }
+
+ /**
+ * Tear the server down.
+ */
+ @Override
+ public void tearDownClass() throws Exception {
+ elasticsearchContainer.close();
+ log.info("Tearing down Elasticsearch test driver");
+ }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
new file mode 100644
index 0000000..d5aa9e6
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapreduce;
+
+import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+
+/**
+ * Executes tests for MR jobs over Elasticsearch dataStore.
+ */
+public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase {
+
+ private GoraElasticsearchTestDriver driver;
+
+ public ElasticsearchStoreMapReduceTest() throws IOException {
+ super();
+ driver = new GoraElasticsearchTestDriver();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ driver.setUpClass();
+ super.setUp();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ driver.tearDownClass();
+ }
+
+ @Override
+ protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
+ return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
+ }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java
new file mode 100644
index 0000000..781ce61
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains map reduce tests.
+ */
+package org.apache.gora.elasticsearch.mapreduce;
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java
new file mode 100644
index 0000000..406f829
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Elasticsearch datastore test utilities.
+ */
+package org.apache.gora.elasticsearch;
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
new file mode 100644
index 0000000..7c95593
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
+import org.apache.gora.elasticsearch.mapping.Field;
+import org.apache.gora.elasticsearch.utils.AuthenticationType;
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.examples.generated.EmployeeInt;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.DataStoreMetadataFactory;
+import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.impl.DataStoreMetadataAnalyzer;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Test case for ElasticsearchStore.
+ */
+public class TestElasticsearchStore extends DataStoreTestBase {
+
+ static {
+ setTestDriver(new GoraElasticsearchTestDriver());
+ }
+
+ @Test
+ public void testInitialize() throws GoraException {
+ log.info("test method: testInitialize");
+
+ ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
+
+ Map<String, Field> fields = new HashMap<String, Field>() {{
+ put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
+ put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
+ put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
+ put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
+ put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
+ put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
+ put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
+ }};
+
+ Assert.assertEquals("frontier", employeeStore.getSchemaName());
+ Assert.assertEquals("frontier", mapping.getIndexName());
+ Assert.assertEquals(fields, mapping.getFields());
+ }
+
+ @Test
+ public void testLoadElasticsearchParameters() throws IOException {
+ log.info("test method: testLoadElasticsearchParameters");
+
+ Properties properties = DataStoreFactory.createProps();
+
+ ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
+
+ Assert.assertEquals("localhost", parameters.getHost());
+ Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
+ Assert.assertEquals("elastic", parameters.getUsername());
+ Assert.assertEquals("password", parameters.getPassword());
+ }
+
+ @Test(expected = GoraException.class)
+ public void testInvalidXmlFile() throws Exception {
+ log.info("test method: testInvalidXmlFile");
+
+ Properties properties = DataStoreFactory.createProps();
+ properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+ properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
+ testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+ }
+
+ @Test
+ public void testXsdValidationParameter() throws GoraException {
+ log.info("test method: testXsdValidationParameter");
+
+ Properties properties = DataStoreFactory.createProps();
+ properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+ properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
+ testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+ }
+
+ @Test
+ public void testGetType() throws GoraException, ClassNotFoundException {
+ Configuration conf = testDriver.getConfiguration();
+ DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+ String actualType = storeMetadataAnalyzer.getType();
+ String expectedType = "ELASTICSEARCH";
+ Assert.assertEquals(expectedType, actualType);
+ }
+
+ @Test
+ public void testGetTablesNames() throws GoraException, ClassNotFoundException {
+ Configuration conf = testDriver.getConfiguration();
+ DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+ List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
+ List<String> expectedTablesNames = new ArrayList<String>() {
+ {
+ add("frontier");
+ add("webpage");
+ }
+ };
+ Assert.assertEquals(expectedTablesNames, actualTablesNames);
+ }
+
+ @Test
+ public void testGetTableInfo() throws GoraException, ClassNotFoundException {
+ Configuration conf = testDriver.getConfiguration();
+ DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+ ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
+ (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
+
+ List<String> expectedDocumentKeys = new ArrayList<String>() {
+ {
+ add("name");
+ add("dateOfBirth");
+ add("ssn");
+ add("value");
+ add("salary");
+ add("boss");
+ add("webpage");
+ add("gora_id");
+ }
+ };
+
+ List<String> expectedDocumentTypes = new ArrayList<String>() {
+ {
+ add("text");
+ add("long");
+ add("text");
+ add("text");
+ add("integer");
+ add("object");
+ add("object");
+ add("keyword");
+ }
+ };
+
+ Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
+ Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
+
+ Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
+ Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
+ }
+
+ @Ignore("Elasticsearch doesn't support 3 types union field yet")
+ @Override
+ public void testGet3UnionField() {
+ }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java
new file mode 100644
index 0000000..cd77199
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Elasticsearch datastore tests.
+ */
+package org.apache.gora.elasticsearch.store;
diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml
new file mode 100644
index 0000000..c1a8ccd
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd">
+
+ <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier">
+ <!--
+ Remove type to test XSD validation.
+ <field name="name" docfield="name" type="text"/>
+ -->
+ <field name="name" docfield="name"/>
+ </class>
+
+</gora-otd>
diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml
new file mode 100644
index 0000000..13a8d08
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd">
+
+ <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier">
+ <field name="name" docfield="name" type="text"/>
+ <field name="dateOfBirth" docfield="dateOfBirth" type="long"/>
+ <field name="ssn" docfield="ssn" type="text"/>
+ <field name="value" docfield="value" type="text"/>
+ <field name="salary" docfield="salary" type="integer"/>
+ <field name="boss" docfield="boss" type="object"/>
+ <field name="webpage" docfield="webpage" type="object"/>
+ </class>
+
+ <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" index="webpage">
+ <field name="url" docfield="url" type="text"/>
+ <field name="content" docfield="content" type="binary"/>
+ <field name="parsedContent" docfield="pContent" type="text"/>
+ <field name="outlinks" docfield="links.out" type="object"/>
+ <field name="headers" docfield="headers" type="object"/>
+ <field name="metadata" docfield="metadata" type="object"/>
+ <field name="byteData" docfield="byteData" type="object"/>
+ <field name="stringData" docfield="stringData" type="object"/>
+ </class>
+
+ <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" index="TokenDatum">
+ <field name="count" docfield="count" type="integer"/>
+ </class>
+
+</gora-otd>
diff --git a/gora-elasticsearch/src/test/resources/gora.properties b/gora-elasticsearch/src/test/resources/gora.properties
new file mode 100644
index 0000000..99842a9
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+gora.datastore.default=org.apache.gora.elasticsearch.store.ElasticsearchStore
+gora.elasticsearch.override.hadoop.configuration=false
+gora.datastore.elasticsearch.host=localhost
+gora.datastore.elasticsearch.port=9200
+#gora.datastore.elasticsearch.scheme=
+
+#Basic authentication requires username and password.
+gora.datastore.elasticsearch.authenticationType=BASIC
+#Default username to connect to the Elasticsearch Docker container.
+gora.datastore.elasticsearch.username=elastic
+gora.datastore.elasticsearch.password=password
+
+#Authentication with an Elasticsearch access token.
+#gora.datastore.elasticsearch.authenticationType=TOKEN
+#gora.datastore.elasticsearch.authorizationToken=
+
+#Authentication with an Elasticsearch API Key requires API Key ID and API Key Secret.
+#gora.datastore.elasticsearch.authenticationType=APIKEY
+#gora.datastore.elasticsearch.apiKeyId=
+#gora.datastore.elasticsearch.apiKeySecret=
+
+#Additional configuration parameters for timeouts and threads.
+#gora.datastore.elasticsearch.connectTimeout=
+#gora.datastore.elasticsearch.socketTimeout=
+#gora.datastore.elasticsearch.ioThreadCount=
diff --git a/pom.xml b/pom.xml
index eeb8182..b3b27f9 100755
--- a/pom.xml
+++ b/pom.xml
@@ -816,6 +816,7 @@
<module>gora-redis</module>
<module>gora-jet</module>
<module>gora-rethinkdb</module>
+ <module>gora-elasticsearch</module>
<module>gora-tutorial</module>
<module>gora-benchmark</module>
<module>sources-dist</module>
@@ -855,6 +856,8 @@
<restlet.version>2.4.0</restlet.version>
<!-- Flink Dependencies -->
<flink.version>1.6.2</flink.version>
+ <!-- Elasticsearch Dependencies -->
+ <elasticsearch.version>7.10.1</elasticsearch.version>
<spark.version>2.2.1</spark.version>
<aerospike.version>5.0.6</aerospike.version>
@@ -1201,6 +1204,13 @@
<version>${redisson.version}</version>
</dependency>
+ <!-- Elasticsearch dependencies -->
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+
<!--Hadoop dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>