You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gora.apache.org by dj...@apache.org on 2021/08/11 18:55:41 UTC
[gora] branch master updated: GORA-664 Fix testcases / formatting
(#247)
This is an automated email from the ASF dual-hosted git repository.
djkevincr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gora.git
The following commit(s) were added to refs/heads/master by this push:
new ffc3a13 GORA-664 Fix testcases / formatting (#247)
ffc3a13 is described below
commit ffc3a13b778dce17db823dbcda8f5ab44640ab85
Author: Kevin Ratnasekera <dj...@yahoo.com>
AuthorDate: Thu Aug 12 00:25:02 2021 +0530
GORA-664 Fix testcases / formatting (#247)
---
.../mapping/ElasticsearchMapping.java | 82 +-
.../mapping/ElasticsearchMappingBuilder.java | 314 ++--
.../apache/gora/elasticsearch/mapping/Field.java | 280 ++--
.../elasticsearch/query/ElasticsearchQuery.java | 22 +-
.../elasticsearch/query/ElasticsearchResult.java | 64 +-
.../elasticsearch/store/ElasticsearchStore.java | 1494 ++++++++++----------
.../ElasticsearchStoreCollectionMetadata.java | 50 +-
.../store/ElasticsearchStoreMetadataAnalyzer.java | 106 +-
.../elasticsearch/utils/AuthenticationType.java | 24 +-
.../utils/ElasticsearchConstants.java | 48 +-
.../utils/ElasticsearchParameters.java | 464 +++---
.../elasticsearch/GoraElasticsearchTestDriver.java | 64 +-
.../mapreduce/ElasticsearchStoreMapReduceTest.java | 58 +-
.../store/TestElasticsearchStore.java | 265 ++--
14 files changed, 1676 insertions(+), 1659 deletions(-)
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
index d31e64f..9c6704f 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
@@ -24,50 +24,50 @@ import java.util.Map;
*/
public class ElasticsearchMapping {
- private String indexName;
- private Map<String, Field> fields;
+ private String indexName;
+ private Map<String, Field> fields;
- /**
- * Empty constructor for the ElasticsearchMapping class.
- */
- public ElasticsearchMapping() {
- fields = new HashMap<>();
- }
+ /**
+ * Empty constructor for the ElasticsearchMapping class.
+ */
+ public ElasticsearchMapping() {
+ fields = new HashMap<>();
+ }
- /**
- * Returns the name of Elasticsearch index linked to the mapping.
- *
- * @return Index's name
- */
- public String getIndexName() {
- return indexName;
- }
+ /**
+ * Returns the name of Elasticsearch index linked to the mapping.
+ *
+ * @return Index's name
+ */
+ public String getIndexName() {
+ return indexName;
+ }
- /**
- * Sets the index name of the Elasticsearch mapping.
- *
- * @param indexName Index's name
- */
- public void setIndexName(String indexName) {
- this.indexName = indexName;
- }
+ /**
+ * Sets the index name of the Elasticsearch mapping.
+ *
+ * @param indexName Index's name
+ */
+ public void setIndexName(String indexName) {
+ this.indexName = indexName;
+ }
- /**
- * Returns a map with all mapped fields.
- *
- * @return Map containing mapped fields
- */
- public Map<String, Field> getFields() {
- return fields;
- }
+ /**
+ * Returns a map with all mapped fields.
+ *
+ * @return Map containing mapped fields
+ */
+ public Map<String, Field> getFields() {
+ return fields;
+ }
- /**
- * Add a new field to the mapped fields.
- *
- * @param classFieldName Field name in the persisted class
- * @param field Mapped field from Elasticsearch index
- */
- public void addField(String classFieldName, Field field) {
- fields.put(classFieldName, field);
- }
+ /**
+ * Add a new field to the mapped fields.
+ *
+ * @param classFieldName Field name in the persisted class
+ * @param field Mapped field from Elasticsearch index
+ */
+ public void addField(String classFieldName, Field field) {
+ fields.put(classFieldName, field);
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
index 30dba7b..31ad474 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
@@ -45,166 +45,166 @@ import java.util.Locale;
*/
public class ElasticsearchMappingBuilder<K, T extends PersistentBase> {
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
-
- /**
- * XSD validation file for the XML mapping.
- */
- private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
-
- // Index description
- static final String ATT_NAME = "name";
-
- static final String ATT_TYPE = "type";
-
- // Class description
- static final String TAG_CLASS = "class";
-
- static final String ATT_KEYCLASS = "keyClass";
-
- static final String ATT_INDEX = "index";
-
- static final String TAG_FIELD = "field";
-
- static final String ATT_DOCFIELD = "docfield";
-
- static final String ATT_SCALINGFACTOR = "scalingFactor";
-
- /**
- * Mapping instance being built.
- */
- private ElasticsearchMapping elasticsearchMapping;
-
- private final ElasticsearchStore<K, T> dataStore;
-
- /**
- * Constructor for ElasticsearchMappingBuilder.
- *
- * @param store ElasticsearchStore instance
- */
- public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
- this.elasticsearchMapping = new ElasticsearchMapping();
- this.dataStore = store;
- }
-
- /**
- * Returns the Elasticsearch Mapping being built.
- *
- * @return Elasticsearch Mapping instance
- */
- public ElasticsearchMapping getElasticsearchMapping() {
- return elasticsearchMapping;
- }
-
- /**
- * Sets the Elasticsearch Mapping.
- *
- * @param elasticsearchMapping Elasticsearch Mapping instance
- */
- public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
- this.elasticsearchMapping = elasticsearchMapping;
- }
-
- /**
- * Reads Elasticsearch mappings from file.
- *
- * @param inputStream Mapping input stream
- * @param xsdValidation Parameter for enabling XSD validation
- */
- public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
- try {
- SAXBuilder saxBuilder = new SAXBuilder();
- if (inputStream == null) {
- LOG.error("The mapping input stream is null!");
- throw new GoraException("The mapping input stream is null!");
- }
-
- // Convert input stream to a string to use it a few times
- String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
-
- // XSD validation for XML file
- if (xsdValidation) {
- Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
- Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
- .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
- schema.newValidator().validate(xmlSource);
- LOG.info("Mapping file is valid.");
- }
-
- Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
- if (document == null) {
- LOG.error("The mapping document is null!");
- throw new GoraException("The mapping document is null!");
- }
-
- Element root = document.getRootElement();
- // Extract class descriptions
- @SuppressWarnings("unchecked")
- List<Element> classElements = root.getChildren(TAG_CLASS);
- for (Element classElement : classElements) {
- final Class<T> persistentClass = dataStore.getPersistentClass();
- final Class<K> keyClass = dataStore.getKeyClass();
- if (haveKeyClass(keyClass, classElement)
- && havePersistentClass(persistentClass, classElement)) {
- loadPersistentClass(classElement, persistentClass);
- break;
- }
- }
- } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
- throw new RuntimeException(ex);
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
+
+ /**
+ * XSD validation file for the XML mapping.
+ */
+ private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
+
+ // Index description
+ static final String ATT_NAME = "name";
+
+ static final String ATT_TYPE = "type";
+
+ // Class description
+ static final String TAG_CLASS = "class";
+
+ static final String ATT_KEYCLASS = "keyClass";
+
+ static final String ATT_INDEX = "index";
+
+ static final String TAG_FIELD = "field";
+
+ static final String ATT_DOCFIELD = "docfield";
+
+ static final String ATT_SCALINGFACTOR = "scalingFactor";
+
+ /**
+ * Mapping instance being built.
+ */
+ private ElasticsearchMapping elasticsearchMapping;
+
+ private final ElasticsearchStore<K, T> dataStore;
+
+ /**
+ * Constructor for ElasticsearchMappingBuilder.
+ *
+ * @param store ElasticsearchStore instance
+ */
+ public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
+ this.elasticsearchMapping = new ElasticsearchMapping();
+ this.dataStore = store;
+ }
+
+ /**
+ * Returns the Elasticsearch Mapping being built.
+ *
+ * @return Elasticsearch Mapping instance
+ */
+ public ElasticsearchMapping getElasticsearchMapping() {
+ return elasticsearchMapping;
+ }
+
+ /**
+ * Sets the Elasticsearch Mapping.
+ *
+ * @param elasticsearchMapping Elasticsearch Mapping instance
+ */
+ public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
+ this.elasticsearchMapping = elasticsearchMapping;
+ }
+
+ /**
+ * Reads Elasticsearch mappings from file.
+ *
+ * @param inputStream Mapping input stream
+ * @param xsdValidation Parameter for enabling XSD validation
+ */
+ public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
+ try {
+ SAXBuilder saxBuilder = new SAXBuilder();
+ if (inputStream == null) {
+ LOG.error("The mapping input stream is null!");
+ throw new GoraException("The mapping input stream is null!");
+ }
+
+ // Convert input stream to a string to use it a few times
+ String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
+
+ // XSD validation for XML file
+ if (xsdValidation) {
+ Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+ Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
+ .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
+ schema.newValidator().validate(xmlSource);
+ LOG.info("Mapping file is valid.");
+ }
+
+ Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+ if (document == null) {
+ LOG.error("The mapping document is null!");
+ throw new GoraException("The mapping document is null!");
+ }
+
+ Element root = document.getRootElement();
+ // Extract class descriptions
+ @SuppressWarnings("unchecked")
+ List<Element> classElements = root.getChildren(TAG_CLASS);
+ for (Element classElement : classElements) {
+ final Class<T> persistentClass = dataStore.getPersistentClass();
+ final Class<K> keyClass = dataStore.getKeyClass();
+ if (haveKeyClass(keyClass, classElement)
+ && havePersistentClass(persistentClass, classElement)) {
+ loadPersistentClass(classElement, persistentClass);
+ break;
}
- LOG.info("Gora Elasticsearch mapping file was read successfully.");
- }
-
- private boolean haveKeyClass(final Class<K> keyClass,
- final Element classElement) {
- return classElement.getAttributeValue(ATT_KEYCLASS).equals(
- keyClass.getName());
- }
-
- private boolean havePersistentClass(final Class<T> persistentClass,
- final Element classElement) {
- return classElement.getAttributeValue(ATT_NAME).equals(
- persistentClass.getName());
+ }
+ } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
+ throw new RuntimeException(ex);
}
-
- /**
- * Handle the XML parsing of the class definition.
- *
- * @param classElement the XML node containing the class definition
- */
- protected void loadPersistentClass(Element classElement,
- Class<T> pPersistentClass) {
-
- String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
- String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
-
+ LOG.info("Gora Elasticsearch mapping file was read successfully.");
+ }
+
+ private boolean haveKeyClass(final Class<K> keyClass,
+ final Element classElement) {
+ return classElement.getAttributeValue(ATT_KEYCLASS).equals(
+ keyClass.getName());
+ }
+
+ private boolean havePersistentClass(final Class<T> persistentClass,
+ final Element classElement) {
+ return classElement.getAttributeValue(ATT_NAME).equals(
+ persistentClass.getName());
+ }
+
+ /**
+ * Handle the XML parsing of the class definition.
+ *
+ * @param classElement the XML node containing the class definition
+ */
+ protected void loadPersistentClass(Element classElement,
+ Class<T> pPersistentClass) {
+
+ String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
+ String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
+
+ elasticsearchMapping.setIndexName(indexName);
+ // docNameFromMapping could be null here
+ if (!indexName.equals(indexNameFromMapping)) {
+ ElasticsearchStore.LOG
+ .info("Keyclass and nameclass match, but mismatching index names. "
+ + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
+ indexNameFromMapping, indexName);
+ if (indexNameFromMapping != null) {
elasticsearchMapping.setIndexName(indexName);
- // docNameFromMapping could be null here
- if (!indexName.equals(indexNameFromMapping)) {
- ElasticsearchStore.LOG
- .info("Keyclass and nameclass match, but mismatching index names. "
- + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
- indexNameFromMapping, indexName);
- if (indexNameFromMapping != null) {
- elasticsearchMapping.setIndexName(indexName);
- }
- }
+ }
+ }
- // Process fields declaration
- @SuppressWarnings("unchecked")
- List<Element> fields = classElement.getChildren(TAG_FIELD);
- for (Element fieldElement : fields) {
- String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
- Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
- Field field;
- if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
- int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
- field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
- } else {
- field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
- }
- elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
- }
+ // Process fields declaration
+ @SuppressWarnings("unchecked")
+ List<Element> fields = classElement.getChildren(TAG_FIELD);
+ for (Element fieldElement : fields) {
+ String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
+ Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
+ Field field;
+ if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
+ int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
+ field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
+ } else {
+ field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
+ }
+ elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
}
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
index 73016cb..e5021fb 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
@@ -24,177 +24,175 @@ import java.util.StringJoiner;
*/
public class Field {
- private String name;
- private FieldType dataType;
+ private String name;
+ private FieldType dataType;
+
+ /**
+ * Constructor for Field.
+ *
+ * @param name Field's name
+ * @param dataType Field's data type
+ */
+ public Field(String name, FieldType dataType) {
+ this.name = name;
+ this.dataType = dataType;
+ }
+
+ /**
+ * Returns the field's name.
+ *
+ * @return Field's name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the field's name.
+ *
+ * @param name Field's name
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Returns the field's data-type.
+ *
+ * @return Field's data-type
+ */
+ public FieldType getDataType() {
+ return dataType;
+ }
+
+ /**
+ * Sets the field's data-type.
+ *
+ * @param dataType Field's data-type
+ */
+ public void setDataType(FieldType dataType) {
+ this.dataType = dataType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Field field = (Field) o;
+ return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, dataType);
+ }
+
+ @Override
+ public String toString() {
+ return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
+ .add("name='" + name + "'")
+ .add("dataType=" + dataType)
+ .toString();
+ }
+
+ /**
+ * Elasticsearch supported data-type enumeration. For a more detailed list of data
+ * types supported by Elasticsearch refer to
+ * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
+ */
+ public enum DataType {
+ BINARY,
+ BOOLEAN,
+ KEYWORD,
+ CONSTANT_KEYWORD,
+ WILDCARD,
+ LONG,
+ INTEGER,
+ SHORT,
+ BYTE,
+ DOUBLE,
+ FLOAT,
+ HALF_FLOAT,
+ SCALED_FLOAT,
+ OBJECT,
+ FLATTENED,
+ NESTED,
+ TEXT,
+ COMPLETION,
+ SEARCH_AS_YOU_TYPE,
+ TOKEN_COUNT
+ }
+
+ public static class FieldType {
+
+ private DataType type;
+
+ // Parameter for scaled_float type.
+ private int scalingFactor;
/**
- * Constructor for Field.
+ * Constructor for FieldType.
*
- * @param name Field's name
- * @param dataType Field's data type
+ * @param type Elasticsearch data type
*/
- public Field(String name, FieldType dataType) {
- this.name = name;
- this.dataType = dataType;
+ public FieldType(DataType type) {
+ this.type = type;
}
/**
- * Returns the field's name.
+ * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
+ * with scaling factor parameter.
*
- * @return Field's name
+ * @param scalingFactor scaled_float field's scaling factor
*/
- public String getName() {
- return name;
+ public FieldType(int scalingFactor) {
+ this.type = DataType.SCALED_FLOAT;
+ this.scalingFactor = scalingFactor;
}
/**
- * Sets the field's name.
- *
- * @param name Field's name
+ * @return Elasticsearch data type
*/
- public void setName(String name) {
- this.name = name;
+ public DataType getType() {
+ return type;
}
/**
- * Returns the field's data-type.
+ * @param type Elasticsearch data type
+ */
+ public void setType(DataType type) {
+ this.type = type;
+ }
+
+ /**
+ * Returns the scaling factor of scaled_float type.
*
- * @return Field's data-type
+ * @return scaled_float field's scaling factor
*/
- public FieldType getDataType() {
- return dataType;
+ public int getScalingFactor() {
+ return scalingFactor;
}
/**
- * Sets the field's data-type.
+ * Sets the scaling factor of scaled_float type.
*
- * @param dataType Field's data-type
+ * @param scalingFactor scaled_float field's scaling factor
*/
- public void setDataType(FieldType dataType) {
- this.dataType = dataType;
+ public void setScalingFactor(int scalingFactor) {
+ this.scalingFactor = scalingFactor;
}
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- Field field = (Field) o;
- return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ FieldType fieldType = (FieldType) o;
+ return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
}
@Override
public int hashCode() {
- return Objects.hash(name, dataType);
- }
-
- @Override
- public String toString() {
- return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
- .add("name='" + name + "'")
- .add("dataType=" + dataType)
- .toString();
- }
-
- /**
- * Elasticsearch supported data-type enumeration. For a more detailed list of data
- * types supported by Elasticsearch refer to
- * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
- */
- public enum DataType {
- BINARY,
- BOOLEAN,
- KEYWORD,
- CONSTANT_KEYWORD,
- WILDCARD,
- LONG,
- INTEGER,
- SHORT,
- BYTE,
- DOUBLE,
- FLOAT,
- HALF_FLOAT,
- SCALED_FLOAT,
- OBJECT,
- FLATTENED,
- NESTED,
- TEXT,
- COMPLETION,
- SEARCH_AS_YOU_TYPE,
- TOKEN_COUNT
- }
-
- public static class FieldType {
-
- private DataType type;
-
- // Parameter for scaled_float type.
- private int scalingFactor;
-
- /**
- * Constructor for FieldType.
- *
- * @param type Elasticsearch data type
- */
- public FieldType(DataType type) {
- this.type = type;
- }
-
- /**
- * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
- * with scaling factor parameter.
- *
- * @param scalingFactor scaled_float field's scaling factor
- */
- public FieldType(int scalingFactor) {
- this.type = DataType.SCALED_FLOAT;
- this.scalingFactor = scalingFactor;
- }
-
- /**
- *
- * @return Elasticsearch data type
- */
- public DataType getType() {
- return type;
- }
-
- /**
- *
- * @param type Elasticsearch data type
- */
- public void setType(DataType type) {
- this.type = type;
- }
-
- /**
- * Returns the scaling factor of scaled_float type.
- *
- * @return scaled_float field's scaling factor
- */
- public int getScalingFactor() {
- return scalingFactor;
- }
-
- /**
- * Sets the scaling factor of scaled_float type.
- *
- * @param scalingFactor scaled_float field's scaling factor
- */
- public void setScalingFactor(int scalingFactor) {
- this.scalingFactor = scalingFactor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- FieldType fieldType = (FieldType) o;
- return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(type, scalingFactor);
- }
+ return Objects.hash(type, scalingFactor);
}
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
index f491006..214380e 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
@@ -26,16 +26,16 @@ import org.apache.gora.store.DataStore;
*/
public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
- /**
- * Constructor for the query.
- *
- * @param dataStore data store used
- */
- public ElasticsearchQuery(DataStore<K, T> dataStore) {
- super(dataStore);
- }
+ /**
+ * Constructor for the query.
+ *
+ * @param dataStore data store used
+ */
+ public ElasticsearchQuery(DataStore<K, T> dataStore) {
+ super(dataStore);
+ }
- public ElasticsearchQuery() {
- super(null);
- }
+ public ElasticsearchQuery() {
+ super(null);
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
index e7f8180..01d5d8e 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
@@ -29,44 +29,44 @@ import java.util.List;
*/
public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> {
- /**
- * List of resulting persistent objects.
- */
- private List<T> persistentObjects;
+ /**
+ * List of resulting persistent objects.
+ */
+ private List<T> persistentObjects;
- /**
- * List of resulting objects keys.
- */
- private List<K> persistentKeys;
+ /**
+ * List of resulting objects keys.
+ */
+ private List<K> persistentKeys;
- public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
- super(dataStore, query);
- this.persistentKeys = persistentKeys;
- this.persistentObjects = persistentObjects;
- }
-
- @Override
- public float getProgress() {
- if (persistentObjects.size() == 0) {
- return 1;
- }
+ public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
+ super(dataStore, query);
+ this.persistentKeys = persistentKeys;
+ this.persistentObjects = persistentObjects;
+ }
- return offset / (float) persistentObjects.size();
+ @Override
+ public float getProgress() {
+ if (persistentObjects.size() == 0) {
+ return 1;
}
- @Override
- public int size() {
- return persistentObjects.size();
- }
+ return offset / (float) persistentObjects.size();
+ }
- @Override
- protected boolean nextInner() {
- if ((int) offset == persistentObjects.size()) {
- return false;
- }
+ @Override
+ public int size() {
+ return persistentObjects.size();
+ }
- persistent = persistentObjects.get((int) offset);
- key = persistentKeys.get((int) offset);
- return persistent != null;
+ @Override
+ protected boolean nextInner() {
+ if ((int) offset == persistentObjects.size()) {
+ return false;
}
+
+ persistent = persistentObjects.get((int) offset);
+ key = persistentKeys.get((int) offset);
+ return persistent != null;
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
index a82de7f..610f0fd 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
@@ -82,7 +82,15 @@ import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
/**
* Implementation of a Apache Elasticsearch data store to be used by Apache Gora.
@@ -92,773 +100,773 @@ import java.util.*;
*/
public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
- public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
- private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
- public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
- private static final String XML_MAPPING_DEFINITION = "gora.mapping";
- public static final String XSD_VALIDATION = "gora.xsd_validation";
-
- /**
- * Elasticsearch client
- */
- private RestHighLevelClient client;
-
- /**
- * Mapping definition for Elasticsearch
- */
- private ElasticsearchMapping elasticsearchMapping;
-
- @Override
- public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
- try {
- LOG.debug("Initializing Elasticsearch store");
- ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
- super.initialize(keyClass, persistentClass, properties);
- ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
- InputStream mappingStream;
- if (properties.containsKey(XML_MAPPING_DEFINITION)) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
- }
- mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
- } else {
- mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
- }
- String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
- builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
- elasticsearchMapping = builder.getElasticsearchMapping();
- client = createClient(parameters);
- LOG.info("Elasticsearch store was successfully initialized.");
- } catch (Exception ex) {
- LOG.error("Error while initializing Elasticsearch store", ex);
- throw new GoraException(ex);
- }
+ public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
+ private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
+ public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
+ private static final String XML_MAPPING_DEFINITION = "gora.mapping";
+ public static final String XSD_VALIDATION = "gora.xsd_validation";
+
+ /**
+ * Elasticsearch client
+ */
+ private RestHighLevelClient client;
+
+ /**
+ * Mapping definition for Elasticsearch
+ */
+ private ElasticsearchMapping elasticsearchMapping;
+
+ @Override
+ public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
+ try {
+ LOG.debug("Initializing Elasticsearch store");
+ ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+ super.initialize(keyClass, persistentClass, properties);
+ ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
+ InputStream mappingStream;
+ if (properties.containsKey(XML_MAPPING_DEFINITION)) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
+ }
+ mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
+ } else {
+ mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
+ }
+ String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
+ builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
+ elasticsearchMapping = builder.getElasticsearchMapping();
+ client = createClient(parameters);
+ LOG.info("Elasticsearch store was successfully initialized.");
+ } catch (Exception ex) {
+ LOG.error("Error while initializing Elasticsearch store", ex);
+ throw new GoraException(ex);
}
-
- public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
- RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
-
- // Choosing the authentication method.
- switch (parameters.getAuthenticationType()) {
- case BASIC:
- if (parameters.getUsername() != null && parameters.getPassword() != null) {
- final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
- credentialsProvider.setCredentials(AuthScope.ANY,
- new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
- clientBuilder.setHttpClientConfigCallback(
- httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
- } else {
- throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
- }
- break;
- case TOKEN:
- if (parameters.getAuthorizationToken() != null) {
- Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
- parameters.getAuthorizationToken())};
- clientBuilder.setDefaultHeaders(defaultHeaders);
- } else {
- throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
- }
- break;
- case APIKEY:
- if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
- String apiKeyAuth = Base64.getEncoder()
- .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
- .getBytes(StandardCharsets.UTF_8));
- Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
- clientBuilder.setDefaultHeaders(defaultHeaders);
- } else {
- throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
- }
- break;
- }
-
- if (parameters.getConnectTimeout() != 0) {
- clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
- requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
- }
-
- if (parameters.getSocketTimeout() != 0) {
- clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
- requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
- }
-
- if (parameters.getIoThreadCount() != 0) {
- clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
- httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
- .setIoThreadCount(parameters.getIoThreadCount()).build()));
+ }
+
+ public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
+ RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
+
+ // Choosing the authentication method.
+ switch (parameters.getAuthenticationType()) {
+ case BASIC:
+ if (parameters.getUsername() != null && parameters.getPassword() != null) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
+ clientBuilder.setHttpClientConfigCallback(
+ httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+ } else {
+ throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
+ }
+ break;
+ case TOKEN:
+ if (parameters.getAuthorizationToken() != null) {
+ Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
+ parameters.getAuthorizationToken())};
+ clientBuilder.setDefaultHeaders(defaultHeaders);
+ } else {
+ throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
+ }
+ break;
+ case APIKEY:
+ if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
+ String apiKeyAuth = Base64.getEncoder()
+ .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
+ .getBytes(StandardCharsets.UTF_8));
+ Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
+ clientBuilder.setDefaultHeaders(defaultHeaders);
+ } else {
+ throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
}
- return new RestHighLevelClient(clientBuilder);
+ break;
}
- public ElasticsearchMapping getMapping() {
- return elasticsearchMapping;
+ if (parameters.getConnectTimeout() != 0) {
+ clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+ requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
}
- @Override
- public String getSchemaName() {
- return elasticsearchMapping.getIndexName();
+ if (parameters.getSocketTimeout() != 0) {
+ clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+ requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
}
- @Override
- public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
- return super.getSchemaName(mappingSchemaName, persistentClass);
+ if (parameters.getIoThreadCount() != 0) {
+ clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
+ httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
+ .setIoThreadCount(parameters.getIoThreadCount()).build()));
}
-
- @Override
- public void createSchema() throws GoraException {
- CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
- Map<String, Object> properties = new HashMap<>();
- for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
- Map<String, Object> fieldType = new HashMap<>();
- fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
- if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
- fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
- }
- properties.put(entry.getKey(), fieldType);
- }
- // Special field for range query
- properties.put("gora_id", new HashMap<String, Object>() {{
- put("type", "keyword");
- }});
- Map<String, Object> mapping = new HashMap<>();
- mapping.put("properties", properties);
- request.mapping(mapping);
- try {
- if (!client.indices().exists(
- new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
- client.indices().create(request, RequestOptions.DEFAULT);
- }
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ return new RestHighLevelClient(clientBuilder);
+ }
+
+ public ElasticsearchMapping getMapping() {
+ return elasticsearchMapping;
+ }
+
+ @Override
+ public String getSchemaName() {
+ return elasticsearchMapping.getIndexName();
+ }
+
+ @Override
+ public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
+ return super.getSchemaName(mappingSchemaName, persistentClass);
+ }
+
+ @Override
+ public void createSchema() throws GoraException {
+ CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
+ Map<String, Object> properties = new HashMap<>();
+ for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
+ Map<String, Object> fieldType = new HashMap<>();
+ fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
+ if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
+ fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
+ }
+ properties.put(entry.getKey(), fieldType);
}
-
- @Override
- public void deleteSchema() throws GoraException {
- DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
- try {
- if (client.indices().exists(
- new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
- client.indices().delete(request, RequestOptions.DEFAULT);
- }
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ // Special field for range query
+ properties.put("gora_id", new HashMap<String, Object>() {{
+ put("type", "keyword");
+ }});
+ Map<String, Object> mapping = new HashMap<>();
+ mapping.put("properties", properties);
+ request.mapping(mapping);
+ try {
+ if (!client.indices().exists(
+ new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+ client.indices().create(request, RequestOptions.DEFAULT);
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public boolean schemaExists() throws GoraException {
- try {
- return client.indices().exists(
- new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ }
+
+ @Override
+ public void deleteSchema() throws GoraException {
+ DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
+ try {
+ if (client.indices().exists(
+ new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+ client.indices().delete(request, RequestOptions.DEFAULT);
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public boolean exists(K key) throws GoraException {
- GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
- getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
- try {
- return client.exists(getRequest, RequestOptions.DEFAULT);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ }
+
+ @Override
+ public boolean schemaExists() throws GoraException {
+ try {
+ return client.indices().exists(
+ new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public T get(K key, String[] fields) throws GoraException {
- String[] requestedFields = getFieldsToQuery(fields);
- List<String> documentFields = new ArrayList<>();
- for (String requestedField : requestedFields) {
- documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
- }
- try {
- // Prepare the Elasticsearch request
- GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
- GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
- if (getResponse.isExists()) {
- Map<String, Object> sourceMap = getResponse.getSourceAsMap();
-
- // Map of field's name and its value from the Document
- Map<String, Object> fieldsAndValues = new HashMap<>();
- for (String field : documentFields) {
- fieldsAndValues.put(field, sourceMap.get(field));
- }
-
- // Build the corresponding persistent
- return newInstance(fieldsAndValues, requestedFields);
- } else {
- return null;
- }
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ }
+
+ @Override
+ public boolean exists(K key) throws GoraException {
+ GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+ getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
+ try {
+ return client.exists(getRequest, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public void put(K key, T obj) throws GoraException {
- if (obj.isDirty()) {
- Schema schemaObj = obj.getSchema();
- List<Schema.Field> fields = schemaObj.getFields();
- Map<String, Object> jsonMap = new HashMap<>();
- for (Schema.Field field : fields) {
- Field mappedField = elasticsearchMapping.getFields().get(field.name());
- if (mappedField != null) {
- Object fieldValue = obj.get(field.pos());
- if (fieldValue != null) {
- Schema fieldSchema = field.schema();
- Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
- jsonMap.put(mappedField.getName(), serializedObj);
- }
- }
- }
- // Special field for range query
- jsonMap.put("gora_id", key);
- // Prepare the Elasticsearch request
- IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
- try {
- client.index(request, RequestOptions.DEFAULT);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
- } else {
- LOG.info("Ignored putting object {} in the store as it is neither "
- + "new, neither dirty.", new Object[]{obj});
- }
+ }
+
+ @Override
+ public T get(K key, String[] fields) throws GoraException {
+ String[] requestedFields = getFieldsToQuery(fields);
+ List<String> documentFields = new ArrayList<>();
+ for (String requestedField : requestedFields) {
+ documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
}
-
- @Override
- public boolean delete(K key) throws GoraException {
- DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
- try {
- DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
- return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ try {
+ // Prepare the Elasticsearch request
+ GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+ GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+ if (getResponse.isExists()) {
+ Map<String, Object> sourceMap = getResponse.getSourceAsMap();
+
+ // Map of field's name and its value from the Document
+ Map<String, Object> fieldsAndValues = new HashMap<>();
+ for (String field : documentFields) {
+ fieldsAndValues.put(field, sourceMap.get(field));
+ }
+
+ // Build the corresponding persistent
+ return newInstance(fieldsAndValues, requestedFields);
+ } else {
+ return null;
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public long deleteByQuery(Query<K, T> query) throws GoraException {
- try {
- BulkByScrollResponse bulkResponse;
- if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
- UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
- QueryBuilder matchDocumentsWithinRange = QueryBuilders
- .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
- updateRequest.setQuery(matchDocumentsWithinRange);
-
- // Create a script for deleting fields
- StringBuilder toDelete = new StringBuilder();
- String[] fieldsToDelete = query.getFields();
- for (String field : fieldsToDelete) {
- String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
- toDelete.append(String.format("ctx._source.remove('%s');", elasticsearchField));
- }
- //toDelete.deleteCharAt(toDelete.length() - 1);
- updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
- bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
- return bulkResponse.getUpdated();
- } else {
- DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
- QueryBuilder matchDocumentsWithinRange = QueryBuilders
- .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
- deleteRequest.setQuery(matchDocumentsWithinRange);
- bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
- return bulkResponse.getDeleted();
- }
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ }
+
+ @Override
+ public void put(K key, T obj) throws GoraException {
+ if (obj.isDirty()) {
+ Schema schemaObj = obj.getSchema();
+ List<Schema.Field> fields = schemaObj.getFields();
+ Map<String, Object> jsonMap = new HashMap<>();
+ for (Schema.Field field : fields) {
+ Field mappedField = elasticsearchMapping.getFields().get(field.name());
+ if (mappedField != null) {
+ Object fieldValue = obj.get(field.pos());
+ if (fieldValue != null) {
+ Schema fieldSchema = field.schema();
+ Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
+ jsonMap.put(mappedField.getName(), serializedObj);
+ }
+ }
+ }
+ // Special field for range query
+ jsonMap.put("gora_id", key);
+ // Prepare the Elasticsearch request
+ IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
+ try {
+ client.index(request, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ } else {
+ LOG.info("Ignored putting object {} in the store as it is neither "
+ + "new, neither dirty.", new Object[]{obj});
+ }
+ }
+
+ @Override
+ public boolean delete(K key) throws GoraException {
+ DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
+ try {
+ DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
+ return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
+ }
+
+ @Override
+ public long deleteByQuery(Query<K, T> query) throws GoraException {
+ try {
+ BulkByScrollResponse bulkResponse;
+ if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
+ UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
+ QueryBuilder matchDocumentsWithinRange = QueryBuilders
+ .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+ updateRequest.setQuery(matchDocumentsWithinRange);
+
+ // Create a script for deleting fields
+ StringBuilder toDelete = new StringBuilder();
+ String[] fieldsToDelete = query.getFields();
+ for (String field : fieldsToDelete) {
+ String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
+ toDelete.append(String.format(Locale.getDefault(), "ctx._source.remove('%s');", elasticsearchField));
+ }
+ //toDelete.deleteCharAt(toDelete.length() - 1);
+ updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
+ bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
+ return bulkResponse.getUpdated();
+ } else {
+ DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
+ QueryBuilder matchDocumentsWithinRange = QueryBuilders
+ .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+ deleteRequest.setQuery(matchDocumentsWithinRange);
+ bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
+ return bulkResponse.getDeleted();
+ }
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ }
- @Override
- public Result<K, T> execute(Query<K, T> query) throws GoraException {
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ @Override
+ public Result<K, T> execute(Query<K, T> query) throws GoraException {
+ SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- // Set the query result limit
- int size = (int) query.getLimit();
- if (size != -1) {
- searchSourceBuilder.size(size);
- }
- try {
- // Build the actual Elasticsearch range query
- QueryBuilder rangeQueryBuilder = QueryBuilders
- .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
- searchSourceBuilder.query(rangeQueryBuilder);
- SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
- searchRequest.source(searchSourceBuilder);
- SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
-
- String[] avroFields = getFieldsToQuery(query.getFields());
-
- SearchHits hits = searchResponse.getHits();
- SearchHit[] searchHits = hits.getHits();
- List<K> hitId = new ArrayList<>();
-
- // Check filter
- Filter<K, T> queryFilter = query.getFilter();
- List<T> filteredObjects = new ArrayList<>();
- for (SearchHit hit : searchHits) {
- Map<String, Object> sourceAsMap = hit.getSourceAsMap();
- if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
- filteredObjects.add(newInstance(sourceAsMap, avroFields));
- hitId.add((K) hit.getId());
- }
- }
- return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ // Set the query result limit
+ int size = (int) query.getLimit();
+ if (size != -1) {
+ searchSourceBuilder.size(size);
}
-
- @Override
- public Query<K, T> newQuery() {
- ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
- query.setFields(getFieldsToQuery(null));
- return query;
+ try {
+ // Build the actual Elasticsearch range query
+ QueryBuilder rangeQueryBuilder = QueryBuilders
+ .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
+ searchSourceBuilder.query(rangeQueryBuilder);
+ SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
+ searchRequest.source(searchSourceBuilder);
+ SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+
+ String[] avroFields = getFieldsToQuery(query.getFields());
+
+ SearchHits hits = searchResponse.getHits();
+ SearchHit[] searchHits = hits.getHits();
+ List<K> hitId = new ArrayList<>();
+
+ // Check filter
+ Filter<K, T> queryFilter = query.getFilter();
+ List<T> filteredObjects = new ArrayList<>();
+ for (SearchHit hit : searchHits) {
+ Map<String, Object> sourceAsMap = hit.getSourceAsMap();
+ if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
+ filteredObjects.add(newInstance(sourceAsMap, avroFields));
+ hitId.add((K) hit.getId());
+ }
+ }
+ return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
- List<PartitionQuery<K, T>> partitions = new ArrayList<>();
- PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
- query);
- partitionQuery.setConf(getConf());
- partitions.add(partitionQuery);
- return partitions;
+ }
+
+ @Override
+ public Query<K, T> newQuery() {
+ ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
+ query.setFields(getFieldsToQuery(null));
+ return query;
+ }
+
+ @Override
+ public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
+ List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+ PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+ query);
+ partitionQuery.setConf(getConf());
+ partitions.add(partitionQuery);
+ return partitions;
+ }
+
+ @Override
+ public void flush() throws GoraException {
+ try {
+ client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+ client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
-
- @Override
- public void flush() throws GoraException {
- try {
- client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
- client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
+ }
+
+ @Override
+ public void close() {
+ try {
+ client.close();
+ LOG.info("Elasticsearch datastore destroyed successfully.");
+ } catch (IOException ex) {
+ LOG.error(ex.getMessage(), ex);
}
-
- @Override
- public void close() {
- try {
- client.close();
- LOG.info("Elasticsearch datastore destroyed successfully.");
- } catch (IOException ex) {
- LOG.error(ex.getMessage(), ex);
- }
+ }
+
+ /**
+ * Build a new instance of the persisted class from the Document retrieved from the database.
+ *
+ * @param fieldsAndValues Map of field's name and its value from the Document
+ * that results from the query to the database
+ * @param requestedFields the list of fields to be mapped to the persistence class instance
+ * @return a persistence class instance which content was deserialized from the Document
+ * @throws IOException
+ */
+ public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
+ // Create new empty persistent bean instance
+ T persistent = newPersistent();
+
+ requestedFields = getFieldsToQuery(requestedFields);
+ // Populate each field
+ for (String objField : requestedFields) {
+ Schema.Field field = fieldMap.get(objField);
+ Schema fieldSchema = field.schema();
+ String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
+ Object fieldValue = fieldsAndValues.get(docFieldName);
+
+ Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
+ persistent.put(field.pos(), result);
}
-
- /**
- * Build a new instance of the persisted class from the Document retrieved from the database.
- *
- * @param fieldsAndValues Map of field's name and its value from the Document
- * that results from the query to the database
- * @param requestedFields the list of fields to be mapped to the persistence class instance
- * @return a persistence class instance which content was deserialized from the Document
- * @throws IOException
- */
- public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
- // Create new empty persistent bean instance
- T persistent = newPersistent();
-
- requestedFields = getFieldsToQuery(requestedFields);
- // Populate each field
- for (String objField : requestedFields) {
- Schema.Field field = fieldMap.get(objField);
- Schema fieldSchema = field.schema();
- String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
- Object fieldValue = fieldsAndValues.get(docFieldName);
-
- Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
- persistent.put(field.pos(), result);
- }
- persistent.clearDirty();
- return persistent;
- }
-
- /**
- * Deserialize an Elasticsearch object to a persistent Avro object.
- *
- * @param avroField persistent Avro class field to which the value will be deserialized
- * @param avroFieldSchema schema for the persistent Avro class field
- * @param elasticsearchValue Elasticsearch field value to be deserialized
- * @return deserialized Avro object from the Elasticsearch object
- * @throws GoraException when the given Elasticsearch value cannot be deserialized
- */
- private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
- Object elasticsearchValue) throws GoraException {
- Object fieldValue;
- switch (avroFieldSchema.getType()) {
- case MAP:
- fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
- break;
- case RECORD:
- fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
- break;
- case ARRAY:
- fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
- break;
- case BOOLEAN:
- fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
- break;
- case BYTES:
- fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
- break;
- case FIXED:
- case NULL:
- fieldValue = null;
- break;
- case UNION:
- fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
- break;
- case DOUBLE:
- fieldValue = Double.parseDouble(elasticsearchValue.toString());
- break;
- case ENUM:
- fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
- break;
- case FLOAT:
- fieldValue = Float.parseFloat(elasticsearchValue.toString());
- break;
- case INT:
- fieldValue = Integer.parseInt(elasticsearchValue.toString());
- break;
- case LONG:
- fieldValue = Long.parseLong(elasticsearchValue.toString());
- break;
- case STRING:
- fieldValue = new Utf8(elasticsearchValue.toString());
- break;
- default:
- fieldValue = elasticsearchValue;
- }
- return fieldValue;
- }
-
- /**
- * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
- * that can safely be written into Avro persistent object.
- *
- * @param avroField persistent Avro class field to which the value will be deserialized
- * @param avroFieldSchema schema for the persistent Avro class field
- * @param elasticsearchValue Elasticsearch field value to be deserialized
- * @return deserialized Avro List from the given Elasticsearch value
- * @throws GoraException when one of the underlying values cannot be deserialized
- */
- private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
- Object elasticsearchValue) throws GoraException {
- List<Object> list = new ArrayList<>();
- if (elasticsearchValue != null) {
- for (Object item : (List<Object>) elasticsearchValue) {
- Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
- list.add(result);
- }
- }
- return new DirtyListWrapper<>(list);
- }
-
- /**
- * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
- * that can safely be written into Avro persistent object.
- *
- * @param avroField persistent Avro class field to which the value will be deserialized
- * @param avroFieldSchema schema for the persistent Avro class field
- * @param elasticsearchMap Elasticsearch Map value to be deserialized
- * @return deserialized Avro Map from the given Elasticsearch Map value
- * @throws GoraException when one of the underlying values cannot be deserialized
- */
- private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
- Map<String, Object> elasticsearchMap) throws GoraException {
- Map<Utf8, Object> deserializedMap = new HashMap<>();
- if (elasticsearchMap != null) {
- for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
- String mapKey = entry.getKey();
- Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
- deserializedMap.put(new Utf8(mapKey), mapValue);
- }
- }
- return new DirtyMapWrapper<>(deserializedMap);
- }
-
- /**
- * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
- * that can safely be written into Avro persistent object.
- *
- * @param avroFieldSchema schema for the persistent Avro class field
- * @param elasticsearchRecord Elasticsearch Record value to be deserialized
- * @return deserialized Avro Object from the given Elasticsearch Record value
- * @throws GoraException when one of the underlying values cannot be deserialized
- */
- private Object fromElasticsearchRecord(Schema avroFieldSchema,
- Map<String, Object> elasticsearchRecord) throws GoraException {
- Class<?> clazz;
- try {
- clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
- } catch (ClassNotFoundException ex) {
- throw new GoraException(ex);
- }
- PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
- for (Schema.Field recField : avroFieldSchema.getFields()) {
- Schema innerSchema = recField.schema();
- Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
- record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
- }
- return record;
- }
-
- /**
- * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
- * that can safely be written into Avro persistent object.
- *
- * @param avroField persistent Avro class field to which the value will be deserialized
- * @param avroFieldSchema schema for the persistent Avro class field
- * @param elasticsearchUnion Elasticsearch Union value to be deserialized
- * @return deserialized Avro Object from the given Elasticsearch Union value
- * @throws GoraException when one of the underlying values cannot be deserialized
- */
- private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
- Object deserializedUnion;
- Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
- Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
- if (avroFieldSchema.getTypes().size() == 2 &&
- (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
- !type0.equals(type1)) {
- int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
- Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
- deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
- } else if (avroFieldSchema.getTypes().size() == 3) {
- Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
- if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
- (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
- if (elasticsearchUnion == null) {
- deserializedUnion = null;
- } else if (elasticsearchUnion instanceof String) {
- throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
- } else {
- int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
- Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
- deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
- }
- } else {
- throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
- }
+ persistent.clearDirty();
+ return persistent;
+ }
+
+ /**
+ * Deserialize an Elasticsearch object to a persistent Avro object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchValue Elasticsearch field value to be deserialized
+ * @return deserialized Avro object from the Elasticsearch object
+ * @throws GoraException when the given Elasticsearch value cannot be deserialized
+ */
+ private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
+ Object elasticsearchValue) throws GoraException {
+ Object fieldValue;
+ switch (avroFieldSchema.getType()) {
+ case MAP:
+ fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
+ break;
+ case RECORD:
+ fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
+ break;
+ case ARRAY:
+ fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
+ break;
+ case BOOLEAN:
+ fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
+ break;
+ case BYTES:
+ fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
+ break;
+ case FIXED:
+ case NULL:
+ fieldValue = null;
+ break;
+ case UNION:
+ fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
+ break;
+ case DOUBLE:
+ fieldValue = Double.parseDouble(elasticsearchValue.toString());
+ break;
+ case ENUM:
+ fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
+ break;
+ case FLOAT:
+ fieldValue = Float.parseFloat(elasticsearchValue.toString());
+ break;
+ case INT:
+ fieldValue = Integer.parseInt(elasticsearchValue.toString());
+ break;
+ case LONG:
+ fieldValue = Long.parseLong(elasticsearchValue.toString());
+ break;
+ case STRING:
+ fieldValue = new Utf8(elasticsearchValue.toString());
+ break;
+ default:
+ fieldValue = elasticsearchValue;
+ }
+ return fieldValue;
+ }
+
+ /**
+ * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchValue Elasticsearch field value to be deserialized
+ * @return deserialized Avro List from the given Elasticsearch value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
+ Object elasticsearchValue) throws GoraException {
+ List<Object> list = new ArrayList<>();
+ if (elasticsearchValue != null) {
+ for (Object item : (List<Object>) elasticsearchValue) {
+ Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
+ list.add(result);
+ }
+ }
+ return new DirtyListWrapper<>(list);
+ }
+
+ /**
+ * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchMap Elasticsearch Map value to be deserialized
+ * @return deserialized Avro Map from the given Elasticsearch Map value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
+ Map<String, Object> elasticsearchMap) throws GoraException {
+ Map<Utf8, Object> deserializedMap = new HashMap<>();
+ if (elasticsearchMap != null) {
+ for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
+ String mapKey = entry.getKey();
+ Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
+ deserializedMap.put(new Utf8(mapKey), mapValue);
+ }
+ }
+ return new DirtyMapWrapper<>(deserializedMap);
+ }
+
+ /**
+ * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchRecord Elasticsearch Record value to be deserialized
+ * @return deserialized Avro Object from the given Elasticsearch Record value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchRecord(Schema avroFieldSchema,
+ Map<String, Object> elasticsearchRecord) throws GoraException {
+ Class<?> clazz;
+ try {
+ clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
+ } catch (ClassNotFoundException ex) {
+ throw new GoraException(ex);
+ }
+ PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
+ for (Schema.Field recField : avroFieldSchema.getFields()) {
+ Schema innerSchema = recField.schema();
+ Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
+ record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
+ }
+ return record;
+ }
+
+ /**
+ * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
+ * that can safely be written into Avro persistent object.
+ *
+ * @param avroField persistent Avro class field to which the value will be deserialized
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param elasticsearchUnion Elasticsearch Union value to be deserialized
+ * @return deserialized Avro Object from the given Elasticsearch Union value
+ * @throws GoraException when one of the underlying values cannot be deserialized
+ */
+ private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
+ Object deserializedUnion;
+ Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+ Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+ if (avroFieldSchema.getTypes().size() == 2 &&
+ (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+ !type0.equals(type1)) {
+ int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
+ } else if (avroFieldSchema.getTypes().size() == 3) {
+ Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+ if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+ (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+ if (elasticsearchUnion == null) {
+ deserializedUnion = null;
+ } else if (elasticsearchUnion instanceof String) {
+ throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
} else {
- throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
- }
- return deserializedUnion;
- }
-
- /**
- * Serialize a persistent Avro object as used in Gora generated classes to
- * an object that can be written into Elasticsearch.
- *
- * @param avroFieldSchema schema for the persistent Avro class field
- * @param avroFieldValue persistent Avro field value to be serialized
- * @return serialized field value
- * @throws GoraException when the given Avro object cannot be serialized
- */
- private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
- Object output = avroFieldValue;
- switch (avroFieldSchema.getType()) {
- case ARRAY:
- output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
- break;
- case MAP:
- output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
- break;
- case RECORD:
- output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
- break;
- case BYTES:
- output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
- break;
- case UNION:
- output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
- break;
- case BOOLEAN:
- case DOUBLE:
- case ENUM:
- case FLOAT:
- case INT:
- case LONG:
- case STRING:
- output = avroFieldValue.toString();
- break;
- case FIXED:
- break;
- case NULL:
- output = null;
- break;
- }
- return output;
- }
-
- /**
- * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
- * List that can safely be written into Elasticsearch.
- *
- * @param collection the collection to be serialized
- * @param avroFieldSchema field schema for the underlying type
- * @return a List version of the collection that can be safely written into Elasticsearch
- * @throws GoraException when one of the underlying values cannot be serialized
- */
- private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
- List<Object> list = new ArrayList<>();
- for (Object item : collection) {
- Object result = serializeFieldValue(avroFieldSchema, item);
- list.add(result);
- }
- return list;
- }
-
- /**
- * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
- * map that can safely be written into Elasticsearch.
- *
- * @param map the map to be serialized
- * @param avroFieldSchema field schema for the underlying type
- * @return a Map version of the Java map that can be safely written into Elasticsearch
- * @throws GoraException when one of the underlying values cannot be serialized
- */
- private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
- Map<CharSequence, Object> serializedMap = new HashMap<>();
- for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
- String mapKey = entry.getKey().toString();
- Object mapValue = entry.getValue();
- Object result = serializeFieldValue(avroFieldSchema, mapValue);
- serializedMap.put(mapKey, result);
- }
- return serializedMap;
- }
-
- /**
- * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
- * record that can safely be written into Elasticsearch.
- *
- * @param record the object to be serialized
- * @param avroFieldSchema field schema for the underlying type
- * @return a record version of the Java object that can be safely written into Elasticsearch
- * @throws GoraException when one of the underlying values cannot be serialized
- */
- private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
- Map<CharSequence, Object> serializedRecord = new HashMap<>();
- for (Schema.Field member : avroFieldSchema.getFields()) {
- Object innerValue = ((PersistentBase) record).get(member.pos());
- serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
- }
- return serializedRecord;
- }
-
- /**
- * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
- * object that can safely be written into Elasticsearch.
- *
- * @param union the object to be serialized
- * @param avroFieldSchema field schema for the underlying type
- * @return a object version of the Java object that can be safely written into Elasticsearch
- * @throws GoraException when one of the underlying values cannot be serialized
- */
- private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
- Object serializedUnion;
- Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
- Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
- if (avroFieldSchema.getTypes().size() == 2 &&
- (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
- !type0.equals(type1)) {
- int schemaPos = getUnionSchema(union, avroFieldSchema);
- Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
- serializedUnion = serializeFieldValue(unionSchema, union);
- } else if (avroFieldSchema.getTypes().size() == 3) {
- Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
- if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
- (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
- if (union == null) {
- serializedUnion = null;
- } else if (union instanceof String) {
- throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
- } else {
- int schemaPos = getUnionSchema(union, avroFieldSchema);
- Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
- serializedUnion = recordToElasticsearch(union, unionSchema);
- }
- } else {
- throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
- }
+ int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ return deserializedUnion;
+ }
+
+ /**
+ * Serialize a persistent Avro object as used in Gora generated classes to
+ * an object that can be written into Elasticsearch.
+ *
+ * @param avroFieldSchema schema for the persistent Avro class field
+ * @param avroFieldValue persistent Avro field value to be serialized
+ * @return serialized field value
+ * @throws GoraException when the given Avro object cannot be serialized
+ */
+ private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
+ Object output = avroFieldValue;
+ switch (avroFieldSchema.getType()) {
+ case ARRAY:
+ output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
+ break;
+ case MAP:
+ output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
+ break;
+ case RECORD:
+ output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
+ break;
+ case BYTES:
+ output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
+ break;
+ case UNION:
+ output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
+ break;
+ case BOOLEAN:
+ case DOUBLE:
+ case ENUM:
+ case FLOAT:
+ case INT:
+ case LONG:
+ case STRING:
+ output = avroFieldValue.toString();
+ break;
+ case FIXED:
+ break;
+ case NULL:
+ output = null;
+ break;
+ }
+ return output;
+ }
+
+ /**
+ * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
+ * List that can safely be written into Elasticsearch.
+ *
+ * @param collection the collection to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a List version of the collection that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
+ List<Object> list = new ArrayList<>();
+ for (Object item : collection) {
+ Object result = serializeFieldValue(avroFieldSchema, item);
+ list.add(result);
+ }
+ return list;
+ }
+
+ /**
+ * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
+ * map that can safely be written into Elasticsearch.
+ *
+ * @param map the map to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a Map version of the Java map that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
+ Map<CharSequence, Object> serializedMap = new HashMap<>();
+ for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
+ String mapKey = entry.getKey().toString();
+ Object mapValue = entry.getValue();
+ Object result = serializeFieldValue(avroFieldSchema, mapValue);
+ serializedMap.put(mapKey, result);
+ }
+ return serializedMap;
+ }
+
+ /**
+ * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+ * record that can safely be written into Elasticsearch.
+ *
+ * @param record the object to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a record version of the Java object that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
+ Map<CharSequence, Object> serializedRecord = new HashMap<>();
+ for (Schema.Field member : avroFieldSchema.getFields()) {
+ Object innerValue = ((PersistentBase) record).get(member.pos());
+ serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
+ }
+ return serializedRecord;
+ }
+
+ /**
+ * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+ * object that can safely be written into Elasticsearch.
+ *
+ * @param union the object to be serialized
+ * @param avroFieldSchema field schema for the underlying type
+ * @return a object version of the Java object that can be safely written into Elasticsearch
+ * @throws GoraException when one of the underlying values cannot be serialized
+ */
+ private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
+ Object serializedUnion;
+ Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+ Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+ if (avroFieldSchema.getTypes().size() == 2 &&
+ (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+ !type0.equals(type1)) {
+ int schemaPos = getUnionSchema(union, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ serializedUnion = serializeFieldValue(unionSchema, union);
+ } else if (avroFieldSchema.getTypes().size() == 3) {
+ Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+ if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+ (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+ if (union == null) {
+ serializedUnion = null;
+ } else if (union instanceof String) {
+ throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
} else {
- throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
- }
- return serializedUnion;
- }
-
- /**
- * Method to retrieve the corresponding schema type index of a particular
- * object having UNION schema. As UNION type can have one or more types and at
- * a given instance, it holds an object of only one type of the defined types,
- * this method is used to figure out the corresponding instance's schema type
- * index.
- *
- * @param instanceValue value that the object holds
- * @param unionSchema union schema containing all of the data types
- * @return the unionSchemaPosition corresponding schema position
- */
- private int getUnionSchema(Object instanceValue, Schema unionSchema) {
- int unionSchemaPos = 0;
- for (Schema currentSchema : unionSchema.getTypes()) {
- Schema.Type schemaType = currentSchema.getType();
- if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
- return unionSchemaPos;
- }
- if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
- return unionSchemaPos;
- }
- unionSchemaPos++;
- }
- return 0;
+ int schemaPos = getUnionSchema(union, avroFieldSchema);
+ Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+ serializedUnion = recordToElasticsearch(union, unionSchema);
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ } else {
+ throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+ }
+ return serializedUnion;
+ }
+
+ /**
+ * Method to retrieve the corresponding schema type index of a particular
+ * object having UNION schema. As UNION type can have one or more types and at
+ * a given instance, it holds an object of only one type of the defined types,
+ * this method is used to figure out the corresponding instance's schema type
+ * index.
+ *
+ * @param instanceValue value that the object holds
+ * @param unionSchema union schema containing all of the data types
+ * @return the unionSchemaPosition corresponding schema position
+ */
+ private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+ int unionSchemaPos = 0;
+ for (Schema currentSchema : unionSchema.getTypes()) {
+ Schema.Type schemaType = currentSchema.getType();
+ if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
+ return unionSchemaPos;
+ }
+ if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
+ return unionSchemaPos;
+ }
+ unionSchemaPos++;
}
+ return 0;
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
index 01466ee..c253e87 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
@@ -25,29 +25,29 @@ import java.util.List;
*/
public class ElasticsearchStoreCollectionMetadata {
- /**
- * Collection document keys present in a given collection at ElasticsearchStore.
- */
- private List<String> documentKeys = new ArrayList<>();
-
- /**
- * Collection document types present in a given collection at ElasticsearchStore.
- */
- private List<String> documentTypes = new ArrayList<>();
-
- public List<String> getDocumentKeys() {
- return documentKeys;
- }
-
- public void setDocumentKeys(List<String> documentKeys) {
- this.documentKeys = documentKeys;
- }
-
- public List<String> getDocumentTypes() {
- return documentTypes;
- }
-
- public void setDocumentTypes(List<String> documentTypes) {
- this.documentTypes = documentTypes;
- }
+ /**
+ * Collection document keys present in a given collection at ElasticsearchStore.
+ */
+ private List<String> documentKeys = new ArrayList<>();
+
+ /**
+ * Collection document types present in a given collection at ElasticsearchStore.
+ */
+ private List<String> documentTypes = new ArrayList<>();
+
+ public List<String> getDocumentKeys() {
+ return documentKeys;
+ }
+
+ public void setDocumentKeys(List<String> documentKeys) {
+ this.documentKeys = documentKeys;
+ }
+
+ public List<String> getDocumentTypes() {
+ return documentTypes;
+ }
+
+ public void setDocumentTypes(List<String> documentTypes) {
+ this.documentTypes = documentTypes;
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
index 6806aa3..465c268 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
@@ -35,68 +35,68 @@ import java.util.Map;
public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer {
- private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
- private RestHighLevelClient elasticsearchClient;
+ private RestHighLevelClient elasticsearchClient;
- @Override
- public void initialize() throws GoraException {
- ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
- elasticsearchClient = ElasticsearchStore.createClient(parameters);
- }
+ @Override
+ public void initialize() throws GoraException {
+ ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+ elasticsearchClient = ElasticsearchStore.createClient(parameters);
+ }
- @Override
- public String getType() {
- return "ELASTICSEARCH";
- }
+ @Override
+ public String getType() {
+ return "ELASTICSEARCH";
+ }
- @Override
- public List<String> getTablesNames() throws GoraException {
- GetIndexRequest request = new GetIndexRequest("*");
- GetIndexResponse response;
- try {
- response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
- if (response == null) {
- LOG.error("Could not find indices.");
- throw new GoraException("Could not find indices.");
- }
- return Arrays.asList(response.getIndices());
+ @Override
+ public List<String> getTablesNames() throws GoraException {
+ GetIndexRequest request = new GetIndexRequest("*");
+ GetIndexResponse response;
+ try {
+ response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
}
+ if (response == null) {
+ LOG.error("Could not find indices.");
+ throw new GoraException("Could not find indices.");
+ }
+ return Arrays.asList(response.getIndices());
+ }
- @Override
- public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
- GetIndexRequest request = new GetIndexRequest(tableName);
- GetIndexResponse getIndexResponse;
- try {
- getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
- } catch (IOException ex) {
- throw new GoraException(ex);
- }
- MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
- Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
-
- List<String> documentTypes = new ArrayList<>();
- List<String> documentKeys = new ArrayList<>();
+ @Override
+ public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
+ GetIndexRequest request = new GetIndexRequest(tableName);
+ GetIndexResponse getIndexResponse;
+ try {
+ getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+ } catch (IOException ex) {
+ throw new GoraException(ex);
+ }
+ MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
+ Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
- for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
- Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
- documentTypes.add((String) subEntry.get("type"));
- documentKeys.add(entry.getKey());
- }
+ List<String> documentTypes = new ArrayList<>();
+ List<String> documentKeys = new ArrayList<>();
- ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
- collectionMetadata.setDocumentKeys(documentKeys);
- collectionMetadata.setDocumentTypes(documentTypes);
- return collectionMetadata;
+ for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
+ Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
+ documentTypes.add((String) subEntry.get("type"));
+ documentKeys.add(entry.getKey());
}
- @Override
- public void close() throws IOException {
- if (elasticsearchClient != null) {
- this.elasticsearchClient.close();
- }
+ ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
+ collectionMetadata.setDocumentKeys(documentKeys);
+ collectionMetadata.setDocumentTypes(documentTypes);
+ return collectionMetadata;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (elasticsearchClient != null) {
+ this.elasticsearchClient.close();
}
+ }
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
index dbbd45b..236b306 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
@@ -20,16 +20,16 @@ package org.apache.gora.elasticsearch.utils;
* Authentication type to connect to the Elasticsearch server.
*/
public enum AuthenticationType {
- /**
- * Basic authentication requires to provide a username and password.
- */
- BASIC,
- /**
- * Token authentication requires to provide an Elasticsearch access token.
- */
- TOKEN,
- /**
- * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
- */
- APIKEY
+ /**
+ * Basic authentication requires to provide a username and password.
+ */
+ BASIC,
+ /**
+ * Token authentication requires to provide an Elasticsearch access token.
+ */
+ TOKEN,
+ /**
+ * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
+ */
+ APIKEY
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
index fa9bdc2..4f0ca4a 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
@@ -20,30 +20,30 @@ package org.apache.gora.elasticsearch.utils;
* Constants file for Elasticsearch.
*/
public class ElasticsearchConstants {
- /**
- * Property indicating if the hadoop configuration has priority or not.
- */
- public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
+ /**
+ * Property indicating if the hadoop configuration has priority or not.
+ */
+ public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
- /**
- * Default configurations for Elasticsearch.
- */
- public static final String DEFAULT_HOST = "localhost";
- public static final int DEFAULT_PORT = 9200;
+ /**
+ * Default configurations for Elasticsearch.
+ */
+ public static final String DEFAULT_HOST = "localhost";
+ public static final int DEFAULT_PORT = 9200;
- /**
- * List of keys used in the configuration file of Elasticsearch.
- */
- public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
- public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
- public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
- public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
- public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
- public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
- public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
- public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
- public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
- public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
- public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
- public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
+ /**
+ * List of keys used in the configuration file of Elasticsearch.
+ */
+ public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
+ public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
+ public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
+ public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
+ public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
+ public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
+ public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
+ public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
+ public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
+ public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
+ public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
+ public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
index 4b0cef4..187b02a 100644
--- a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
@@ -1,14 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -26,249 +25,248 @@ import java.util.Properties;
*/
public class ElasticsearchParameters {
- /**
- * Elasticsearch server host.
- */
- private String host;
-
- /**
- * Elasticsearch server port.
- */
- private int port;
-
- /**
- * Elasticsearch server scheme.
- * Optional. If not provided, defaults to http.
- */
- private String scheme;
-
- /**
- * Authentication type to connect to the server.
- * Can be BASIC, TOKEN or APIKEY.
- */
- private AuthenticationType authenticationType;
-
- /**
- * Username to use for server authentication.
- * Required for BASIC authentication to connect to the server.
- */
- private String username;
-
- /**
- * Password to use for server authentication.
- * Required for BASIC authentication to connect to the server.
- */
- private String password;
-
- /**
- * Authorization token to use for server authentication.
- * Required for TOKEN authentication to connect to the server.
- */
- private String authorizationToken;
-
- /**
- * API Key ID to use for server authentication.
- * Required for APIKEY authentication to connect to the server.
- */
- private String apiKeyId;
-
- /**
- * API Key Secret to use for server authentication.
- * Required for APIKEY authentication to connect to the server.
- */
- private String apiKeySecret;
-
- /**
- * Timeout in milliseconds used for establishing the connection to the server.
- * Optional. If not provided, defaults to 5000s.
- *
- */
- private int connectTimeout;
-
- /**
- * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
- * Optional. If not provided, defaults to 60000s.
- */
- private int socketTimeout;
-
- /**
- * Number of worker threads used by the connection manager.
- * Optional. If not provided, defaults to 1.
- */
- private int ioThreadCount;
-
- public ElasticsearchParameters(String host, int port) {
- this.host = host;
- this.port = port;
+ /**
+ * Elasticsearch server host.
+ */
+ private String host;
+
+ /**
+ * Elasticsearch server port.
+ */
+ private int port;
+
+ /**
+ * Elasticsearch server scheme.
+ * Optional. If not provided, defaults to http.
+ */
+ private String scheme;
+
+ /**
+ * Authentication type to connect to the server.
+ * Can be BASIC, TOKEN or APIKEY.
+ */
+ private AuthenticationType authenticationType;
+
+ /**
+ * Username to use for server authentication.
+ * Required for BASIC authentication to connect to the server.
+ */
+ private String username;
+
+ /**
+ * Password to use for server authentication.
+ * Required for BASIC authentication to connect to the server.
+ */
+ private String password;
+
+ /**
+ * Authorization token to use for server authentication.
+ * Required for TOKEN authentication to connect to the server.
+ */
+ private String authorizationToken;
+
+ /**
+ * API Key ID to use for server authentication.
+ * Required for APIKEY authentication to connect to the server.
+ */
+ private String apiKeyId;
+
+ /**
+ * API Key Secret to use for server authentication.
+ * Required for APIKEY authentication to connect to the server.
+ */
+ private String apiKeySecret;
+
+ /**
+ * Timeout in milliseconds used for establishing the connection to the server.
+ * Optional. If not provided, defaults to 5000s.
+ */
+ private int connectTimeout;
+
+ /**
+ * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
+ * Optional. If not provided, defaults to 60000s.
+ */
+ private int socketTimeout;
+
+ /**
+ * Number of worker threads used by the connection manager.
+ * Optional. If not provided, defaults to 1.
+ */
+ private int ioThreadCount;
+
+ public ElasticsearchParameters(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ public void setScheme(String scheme) {
+ this.scheme = scheme;
+ }
+
+ public AuthenticationType getAuthenticationType() {
+ return authenticationType;
+ }
+
+ public void setAuthenticationType(AuthenticationType authenticationType) {
+ this.authenticationType = authenticationType;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public String getAuthorizationToken() {
+ return authorizationToken;
+ }
+
+ public void setAuthorizationToken(String authorizationToken) {
+ this.authorizationToken = authorizationToken;
+ }
+
+ public String getApiKeyId() {
+ return apiKeyId;
+ }
+
+ public void setApiKeyId(String apiKeyId) {
+ this.apiKeyId = apiKeyId;
+ }
+
+ public String getApiKeySecret() {
+ return apiKeySecret;
+ }
+
+ public void setApiKeySecret(String apiKeySecret) {
+ this.apiKeySecret = apiKeySecret;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public int getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ public void setSocketTimeout(int socketTimeout) {
+ this.socketTimeout = socketTimeout;
+ }
+
+ public int getIoThreadCount() {
+ return ioThreadCount;
+ }
+
+ public void setIoThreadCount(int ioThreadCount) {
+ this.ioThreadCount = ioThreadCount;
+ }
+
+ /**
+ * Reads Elasticsearch parameters from a properties list.
+ *
+ * @param properties Properties list
+ * @return Elasticsearch parameters instance
+ */
+ public static ElasticsearchParameters load(Properties properties, Configuration conf) {
+ ElasticsearchParameters elasticsearchParameters;
+
+ if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
+ elasticsearchParameters = new ElasticsearchParameters(
+ conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+ conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
+ } else {
+ elasticsearchParameters = new ElasticsearchParameters(
+ properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+ Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
+ String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
}
- public String getHost() {
- return host;
+ String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
+ if (schemeProperty != null) {
+ elasticsearchParameters.setScheme(schemeProperty);
}
- public void setHost(String host) {
- this.host = host;
+ AuthenticationType authenticationTypeProperty =
+ AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
+ if (authenticationTypeProperty != null) {
+ elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
}
- public int getPort() {
- return port;
+ String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
+ if (usernameProperty != null) {
+ elasticsearchParameters.setUsername(usernameProperty);
}
- public void setPort(int port) {
- this.port = port;
+ String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
+ if (passwordProperty != null) {
+ elasticsearchParameters.setPassword(passwordProperty);
}
- public String getScheme() {
- return scheme;
+ String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
+ if (authorizationTokenProperty != null) {
+ elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
}
- public void setScheme(String scheme) {
- this.scheme = scheme;
+ String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
+ if (apiKeyIdProperty != null) {
+ elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
}
- public AuthenticationType getAuthenticationType() {
- return authenticationType;
+ String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
+ if (apiKeySecretProperty != null) {
+ elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
}
- public void setAuthenticationType(AuthenticationType authenticationType) {
- this.authenticationType = authenticationType;
+ String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
+ if (connectTimeoutProperty != null) {
+ elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
}
- public String getUsername() {
- return username;
+ String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
+ if (socketTimeoutProperty != null) {
+ elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
}
- public void setUsername(String username) {
- this.username = username;
+ String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
+ if (ioThreadCountProperty != null) {
+ elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
}
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getAuthorizationToken() {
- return authorizationToken;
- }
-
- public void setAuthorizationToken(String authorizationToken) {
- this.authorizationToken = authorizationToken;
- }
-
- public String getApiKeyId() {
- return apiKeyId;
- }
-
- public void setApiKeyId(String apiKeyId) {
- this.apiKeyId = apiKeyId;
- }
-
- public String getApiKeySecret() {
- return apiKeySecret;
- }
-
- public void setApiKeySecret(String apiKeySecret) {
- this.apiKeySecret = apiKeySecret;
- }
-
- public int getConnectTimeout() {
- return connectTimeout;
- }
-
- public void setConnectTimeout(int connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-
- public int getSocketTimeout() {
- return socketTimeout;
- }
-
- public void setSocketTimeout(int socketTimeout) {
- this.socketTimeout = socketTimeout;
- }
-
- public int getIoThreadCount() {
- return ioThreadCount;
- }
-
- public void setIoThreadCount(int ioThreadCount) {
- this.ioThreadCount = ioThreadCount;
- }
-
- /**
- * Reads Elasticsearch parameters from a properties list.
- *
- * @param properties Properties list
- * @return Elasticsearch parameters instance
- */
- public static ElasticsearchParameters load(Properties properties, Configuration conf) {
- ElasticsearchParameters elasticsearchParameters;
-
- if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
- elasticsearchParameters = new ElasticsearchParameters(
- conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
- conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
- } else {
- elasticsearchParameters = new ElasticsearchParameters(
- properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
- Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
- String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
- }
-
- String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
- if (schemeProperty != null) {
- elasticsearchParameters.setScheme(schemeProperty);
- }
-
- AuthenticationType authenticationTypeProperty =
- AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
- if (authenticationTypeProperty != null) {
- elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
- }
-
- String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
- if (usernameProperty != null) {
- elasticsearchParameters.setUsername(usernameProperty);
- }
-
- String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
- if (passwordProperty != null) {
- elasticsearchParameters.setPassword(passwordProperty);
- }
-
- String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
- if (authorizationTokenProperty != null) {
- elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
- }
-
- String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
- if (apiKeyIdProperty != null) {
- elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
- }
-
- String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
- if (apiKeySecretProperty != null) {
- elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
- }
-
- String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
- if (connectTimeoutProperty != null) {
- elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
- }
-
- String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
- if (socketTimeoutProperty != null) {
- elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
- }
-
- String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
- if (ioThreadCountProperty != null) {
- elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
- }
-
- return elasticsearchParameters;
- }
+ return elasticsearchParameters;
+ }
}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
index ef17dbc..6d4d4c3 100644
--- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
@@ -32,40 +32,40 @@ import java.util.Properties;
*/
public class GoraElasticsearchTestDriver extends GoraTestDriver {
- private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
- private ElasticsearchContainer elasticsearchContainer;
+ private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
+ private ElasticsearchContainer elasticsearchContainer;
- /**
- * Constructor for this class.
- */
- public GoraElasticsearchTestDriver() {
- super(ElasticsearchStore.class);
- Properties properties = DataStoreFactory.createProps();
- elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
- .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
- .withEnv("xpack.security.enabled", "true");
- }
+ /**
+ * Constructor for this class.
+ */
+ public GoraElasticsearchTestDriver() {
+ super(ElasticsearchStore.class);
+ Properties properties = DataStoreFactory.createProps();
+ elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
+ .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
+ .withEnv("xpack.security.enabled", "true");
+ }
- /**
- * Initiate the Elasticsearch server on the default port.
- */
- @Override
- public void setUpClass() throws Exception {
- elasticsearchContainer.start();
- log.info("Setting up Elasticsearch test driver");
+ /**
+ * Initiate the Elasticsearch server on the default port.
+ */
+ @Override
+ public void setUpClass() throws Exception {
+ elasticsearchContainer.start();
+ log.info("Setting up Elasticsearch test driver");
- int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
- String host = elasticsearchContainer.getContainerIpAddress();
- conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
- conf.set(ElasticsearchConstants.PROP_HOST, host);
- }
+ int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
+ String host = elasticsearchContainer.getContainerIpAddress();
+ conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
+ conf.set(ElasticsearchConstants.PROP_HOST, host);
+ }
- /**
- * Tear the server down.
- */
- @Override
- public void tearDownClass() throws Exception {
- elasticsearchContainer.close();
- log.info("Tearing down Elasticsearch test driver");
- }
+ /**
+ * Tear the server down.
+ */
+ @Override
+ public void tearDownClass() throws Exception {
+ elasticsearchContainer.close();
+ log.info("Tearing down Elasticsearch test driver");
+ }
}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
index d5aa9e6..c05c441 100644
--- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
@@ -19,10 +19,12 @@ package org.apache.gora.elasticsearch.mapreduce;
import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
import org.apache.gora.examples.generated.WebPage;
import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.mapreduce.MapReduceTestUtils;
import org.apache.gora.store.DataStore;
import org.apache.gora.store.DataStoreFactory;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
import java.io.IOException;
@@ -31,29 +33,35 @@ import java.io.IOException;
*/
public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase {
- private GoraElasticsearchTestDriver driver;
-
- public ElasticsearchStoreMapReduceTest() throws IOException {
- super();
- driver = new GoraElasticsearchTestDriver();
- }
-
- @Override
- @Before
- public void setUp() throws Exception {
- driver.setUpClass();
- super.setUp();
- }
-
- @Override
- @After
- public void tearDown() throws Exception {
- super.tearDown();
- driver.tearDownClass();
- }
-
- @Override
- protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
- return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
- }
+ private GoraElasticsearchTestDriver driver;
+
+ public ElasticsearchStoreMapReduceTest() throws IOException {
+ super();
+ driver = new GoraElasticsearchTestDriver();
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ driver.setUpClass();
+ super.setUp();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ super.tearDown();
+ driver.tearDownClass();
+ }
+
+ @Override
+ protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
+ return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
+ }
+
+ @Test
+ @Override
+ public void testCountQuery() throws Exception {
+ MapReduceTestUtils.testCountQuery(createWebPageDataStore(), driver.getConfiguration());
+ }
}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
index 7c95593..fb7fd0d 100644
--- a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
@@ -34,140 +34,145 @@ import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
/**
* Test case for ElasticsearchStore.
*/
public class TestElasticsearchStore extends DataStoreTestBase {
- static {
- setTestDriver(new GoraElasticsearchTestDriver());
- }
-
- @Test
- public void testInitialize() throws GoraException {
- log.info("test method: testInitialize");
-
- ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
-
- Map<String, Field> fields = new HashMap<String, Field>() {{
- put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
- put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
- put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
- put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
- put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
- put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
- put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
- }};
-
- Assert.assertEquals("frontier", employeeStore.getSchemaName());
- Assert.assertEquals("frontier", mapping.getIndexName());
- Assert.assertEquals(fields, mapping.getFields());
- }
-
- @Test
- public void testLoadElasticsearchParameters() throws IOException {
- log.info("test method: testLoadElasticsearchParameters");
-
- Properties properties = DataStoreFactory.createProps();
-
- ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
-
- Assert.assertEquals("localhost", parameters.getHost());
- Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
- Assert.assertEquals("elastic", parameters.getUsername());
- Assert.assertEquals("password", parameters.getPassword());
- }
-
- @Test(expected = GoraException.class)
- public void testInvalidXmlFile() throws Exception {
- log.info("test method: testInvalidXmlFile");
-
- Properties properties = DataStoreFactory.createProps();
- properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
- properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
- testDriver.createDataStore(String.class, EmployeeInt.class, properties);
- }
-
- @Test
- public void testXsdValidationParameter() throws GoraException {
- log.info("test method: testXsdValidationParameter");
-
- Properties properties = DataStoreFactory.createProps();
- properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
- properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
- testDriver.createDataStore(String.class, EmployeeInt.class, properties);
- }
-
- @Test
- public void testGetType() throws GoraException, ClassNotFoundException {
- Configuration conf = testDriver.getConfiguration();
- DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
-
- String actualType = storeMetadataAnalyzer.getType();
- String expectedType = "ELASTICSEARCH";
- Assert.assertEquals(expectedType, actualType);
- }
-
- @Test
- public void testGetTablesNames() throws GoraException, ClassNotFoundException {
- Configuration conf = testDriver.getConfiguration();
- DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
-
- List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
- List<String> expectedTablesNames = new ArrayList<String>() {
- {
- add("frontier");
- add("webpage");
- }
- };
- Assert.assertEquals(expectedTablesNames, actualTablesNames);
- }
-
- @Test
- public void testGetTableInfo() throws GoraException, ClassNotFoundException {
- Configuration conf = testDriver.getConfiguration();
- DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
-
- ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
- (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
-
- List<String> expectedDocumentKeys = new ArrayList<String>() {
- {
- add("name");
- add("dateOfBirth");
- add("ssn");
- add("value");
- add("salary");
- add("boss");
- add("webpage");
- add("gora_id");
- }
- };
-
- List<String> expectedDocumentTypes = new ArrayList<String>() {
- {
- add("text");
- add("long");
- add("text");
- add("text");
- add("integer");
- add("object");
- add("object");
- add("keyword");
- }
- };
-
- Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
- Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
-
- Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
- Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
- }
-
- @Ignore("Elasticsearch doesn't support 3 types union field yet")
- @Override
- public void testGet3UnionField() {
- }
+ static {
+ setTestDriver(new GoraElasticsearchTestDriver());
+ }
+
+ @Test
+ public void testInitialize() throws GoraException {
+ log.info("test method: testInitialize");
+
+ ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
+
+ Map<String, Field> fields = new HashMap<String, Field>() {{
+ put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
+ put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
+ put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
+ put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
+ put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
+ put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
+ put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
+ }};
+
+ Assert.assertEquals("frontier", employeeStore.getSchemaName());
+ Assert.assertEquals("frontier", mapping.getIndexName());
+ Assert.assertEquals(fields, mapping.getFields());
+ }
+
+ @Test
+ public void testLoadElasticsearchParameters() throws IOException {
+ log.info("test method: testLoadElasticsearchParameters");
+
+ Properties properties = DataStoreFactory.createProps();
+
+ ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
+
+ Assert.assertEquals("localhost", parameters.getHost());
+ Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
+ Assert.assertEquals("elastic", parameters.getUsername());
+ Assert.assertEquals("password", parameters.getPassword());
+ }
+
+ @Test(expected = GoraException.class)
+ public void testInvalidXmlFile() throws Exception {
+ log.info("test method: testInvalidXmlFile");
+
+ Properties properties = DataStoreFactory.createProps();
+ properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+ properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
+ testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+ }
+
+ @Test
+ public void testXsdValidationParameter() throws GoraException {
+ log.info("test method: testXsdValidationParameter");
+
+ Properties properties = DataStoreFactory.createProps();
+ properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+ properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
+ testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+ }
+
+ @Test
+ public void testGetType() throws GoraException, ClassNotFoundException {
+ Configuration conf = testDriver.getConfiguration();
+ DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+ String actualType = storeMetadataAnalyzer.getType();
+ String expectedType = "ELASTICSEARCH";
+ Assert.assertEquals(expectedType, actualType);
+ }
+
+ @Test
+ public void testGetTablesNames() throws GoraException, ClassNotFoundException {
+ Configuration conf = testDriver.getConfiguration();
+ DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+ List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
+ List<String> expectedTablesNames = new ArrayList<String>() {
+ {
+ add("frontier");
+ add("webpage");
+ }
+ };
+ Assert.assertEquals(expectedTablesNames, actualTablesNames);
+ }
+
+ @Test
+ public void testGetTableInfo() throws GoraException, ClassNotFoundException {
+ Configuration conf = testDriver.getConfiguration();
+ DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+ ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
+ (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
+
+ List<String> expectedDocumentKeys = new ArrayList<String>() {
+ {
+ add("name");
+ add("dateOfBirth");
+ add("ssn");
+ add("value");
+ add("salary");
+ add("boss");
+ add("webpage");
+ add("gora_id");
+ }
+ };
+
+ List<String> expectedDocumentTypes = new ArrayList<String>() {
+ {
+ add("text");
+ add("long");
+ add("text");
+ add("text");
+ add("integer");
+ add("object");
+ add("object");
+ add("keyword");
+ }
+ };
+
+ Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
+ Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
+
+ Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
+ Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
+ }
+
+ @Ignore("Elasticsearch doesn't support 3 types union field yet")
+ @Override
+ public void testGet3UnionField() {
+ }
}