You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/01/26 04:13:32 UTC
[4/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch
REST module to new client.
METAMODEL-1179: Upgraded ElasticSearch REST module to new client.
Using the official elastic REST high level client for ElasticSearch.
Closes #177
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/bda8d764
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/bda8d764
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/bda8d764
Branch: refs/heads/master
Commit: bda8d764f65acdf3b1f520d21cb51a7396e23c7d
Parents: c57d508
Author: Arjan Seijkens <ar...@gmail.com>
Authored: Thu Jan 25 20:13:08 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Thu Jan 25 20:13:08 2018 -0800
----------------------------------------------------------------------
.travis.yml | 5 +-
CHANGES.md | 3 +-
.../AbstractElasticSearchDataContext.java | 140 +++++
.../AbstractElasticSearchDataSet.java | 123 ++++
.../common/ElasticSearchDateConverter.java | 17 +-
.../common/ElasticSearchMetaDataParser.java | 79 +++
.../common/ElasticSearchUtils.java | 80 +--
.../common/ElasticSearchUtilsTest.java | 63 ++
elasticsearch/native/pom.xml | 36 ++
.../ElasticSearchCreateTableBuilder.java | 8 +-
.../nativeclient/ElasticSearchDataContext.java | 171 ++----
.../ElasticSearchDataContextFactory.java | 47 +-
.../nativeclient/ElasticSearchDataSet.java | 90 +--
.../ElasticSearchDeleteBuilder.java | 27 +-
.../ElasticSearchDropTableBuilder.java | 103 ----
.../ElasticSearchInsertBuilder.java | 8 +-
.../ElasticSearchMetaDataParser.java | 81 ---
.../ElasticSearchUpdateBuilder.java | 116 ++++
.../ElasticSearchUpdateCallback.java | 10 +-
.../nativeclient/NativeElasticSearchUtils.java | 67 --
.../ElasticSearchDataContextTest.java | 231 +++----
.../ElasticSearchMetaDataParserTest.java | 1 +
.../nativeclient/ElasticSearchUtilsTest.java | 63 --
.../utils/EmbeddedElasticsearchServer.java | 71 ---
elasticsearch/pom.xml | 2 +-
elasticsearch/rest/pom.xml | 114 +++-
.../rest/ElasticSearchRestClient.java | 134 ++++
.../ElasticSearchRestCreateTableBuilder.java | 55 ++
.../rest/ElasticSearchRestDataContext.java | 226 ++-----
.../ElasticSearchRestDataContextFactory.java | 46 +-
.../rest/ElasticSearchRestDataSet.java | 65 ++
.../rest/ElasticSearchRestDeleteBuilder.java | 96 +++
.../rest/ElasticSearchRestInsertBuilder.java | 72 +++
.../rest/ElasticSearchRestUpdateCallback.java | 167 +++++
.../elasticsearch/rest/JestClientExecutor.java | 51 --
.../elasticsearch/rest/JestDeleteScroll.java | 57 --
.../JestElasticSearchCreateTableBuilder.java | 56 --
.../rest/JestElasticSearchDataSet.java | 124 ----
.../rest/JestElasticSearchDeleteBuilder.java | 76 ---
.../rest/JestElasticSearchDropTableBuilder.java | 62 --
.../rest/JestElasticSearchInsertBuilder.java | 74 ---
.../rest/JestElasticSearchMetaDataParser.java | 75 ---
.../rest/JestElasticSearchUpdateCallback.java | 164 -----
.../rest/JestElasticSearchUtils.java | 90 ---
.../ElasticSearchRestDataContexFactoryIT.java | 131 ++++
.../rest/ElasticSearchRestDataContextIT.java | 535 ++++++++++++++++
.../rest/JestElasticSearchDataContextTest.java | 615 -------------------
.../JestElasticSearchMetaDataParserTest.java | 70 ---
.../rest/JestElasticSearchUtilsTest.java | 188 ------
.../rest/utils/EmbeddedElasticsearchServer.java | 72 ---
.../rest/src/test/resources/Dockerfile | 5 +
.../rest/src/test/resources/elasticsearch.yml | 13 +
.../apache/metamodel/DataContextFactory.java | 6 +-
hbase/pom.xml | 4 +
pom.xml | 1 +
55 files changed, 2276 insertions(+), 2810 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 1b1e342..65df979 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,7 +15,10 @@ before_install:
services:
- couchdb
- mongodb
-
+ - docker
+
+script: "mvn clean verify -P integration-test"
+
after_success:
- mvn test javadoc:javadoc
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index 5af3fc4..7ab1d22 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,6 @@
-### WIP
+### Apache MetaModel 5.1.0 (WIP)
+ * [METAMODEL-1179] - Refactored ElasticSearch REST module to use new official REST based client from Elastic.
* [METAMODEL-1177] - Made TableType.TABLE the default table type, replacing null values.
### Apache MetaModel 5.0.1
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
new file mode 100644
index 0000000..c8ffae3
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.common.unit.TimeValue;
+
+public abstract class AbstractElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext,
+ UpdateableDataContext {
+
+ public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
+
+ protected final String indexName;
+
+ // Table definitions that are set from the beginning, not supposed to be
+ // changed.
+ protected final List<SimpleTableDef> staticTableDefinitions;
+
+ // Table definitions that are discovered, these can change
+ protected final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+
+ /**
+ * Constructs a {@link ElasticSearchRestDataContext}. This constructor
+ * accepts a custom array of {@link SimpleTableDef}s which allows the user
+ * to define his own view on the indexes in the engine.
+ *
+ * @param indexName
+ * the name of the ElasticSearch index to represent
+ * @param tableDefinitions
+ * an array of {@link SimpleTableDef}s, which define the table
+ * and column model of the ElasticSearch index.
+ */
+ public AbstractElasticSearchDataContext(String indexName, SimpleTableDef... tableDefinitions) {
+ super(false);
+ if (indexName == null || indexName.trim().length() == 0) {
+ throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
+ }
+ this.indexName = indexName;
+ this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
+ .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
+ }
+
+ /**
+ * Performs an analysis of the available indexes in an ElasticSearch cluster
+ * {@link JestClient} instance and detects the elasticsearch types structure
+ * based on the metadata provided by the ElasticSearch java client.
+ *
+ * @see {@link #detectTable(JsonObject, String)}
+ * @return a mutable schema instance, useful for further fine tuning by the
+ * user.
+ */
+ protected abstract SimpleTableDef[] detectSchema();
+
+ @Override
+ protected Schema getMainSchema() throws MetaModelException {
+ final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+ for (final SimpleTableDef tableDef : staticTableDefinitions) {
+ addTable(theSchema, tableDef);
+ }
+
+ final SimpleTableDef[] tables = detectSchema();
+ synchronized (this) {
+ dynamicTableDefinitions.clear();
+ dynamicTableDefinitions.addAll(Arrays.asList(tables));
+ for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
+ final List<String> tableNames = theSchema.getTableNames();
+
+ if (!tableNames.contains(tableDef.getName())) {
+ addTable(theSchema, tableDef);
+ }
+ }
+ }
+
+ return theSchema;
+ }
+
+ private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
+ final MutableTable table = tableDef.toTable().setSchema(theSchema);
+ final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
+ if (idColumn != null && idColumn instanceof MutableColumn) {
+ final MutableColumn mutableColumn = (MutableColumn) idColumn;
+ mutableColumn.setPrimaryKey(true);
+ }
+ theSchema.addTable(table);
+ }
+
+ @Override
+ protected String getMainSchemaName() throws MetaModelException {
+ return indexName;
+ }
+
+ /**
+ * Gets the name of the index that this {@link DataContext} is working on.
+ */
+ public String getIndexName() {
+ return indexName;
+ }
+
+ protected boolean limitMaxRowsIsSet(int maxRows) {
+ return (maxRows != -1);
+ }
+
+ protected static SimpleTableDef[] sortTables(final List<SimpleTableDef> result) {
+ final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
+ Arrays.sort(tableDefArray, (o1, o2) -> o1.getName().compareTo(o2.getName()));
+ return tableDefArray;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
new file mode 100644
index 0000000..fea2190
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+public abstract class AbstractElasticSearchDataSet extends AbstractDataSet {
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractElasticSearchDataSet.class);
+
+ protected final AtomicBoolean _closed;
+
+ protected SearchResponse _searchResponse;
+ protected SearchHit _currentHit;
+ protected int _hitIndex = 0;
+
+ public AbstractElasticSearchDataSet(final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+ super(selectItems);
+ _searchResponse = searchResponse;
+ _closed = new AtomicBoolean(false);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ boolean closeNow = _closed.compareAndSet(true, false);
+ if (closeNow) {
+ closeNow();
+ }
+ }
+
+ protected abstract void closeNow();
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!_closed.get()) {
+ logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
+ close();
+ }
+ }
+
+ @Override
+ public boolean next() {
+ final SearchHit[] hits = _searchResponse.getHits().getHits();
+ if (hits.length == 0) {
+ // break condition for the scroll
+ _currentHit = null;
+ return false;
+ }
+
+ if (_hitIndex < hits.length) {
+ // pick the next hit within this search response
+ _currentHit = hits[_hitIndex];
+ _hitIndex++;
+ return true;
+ }
+
+ final String scrollId = _searchResponse.getScrollId();
+ if (scrollId == null) {
+ // this search response is not scrolleable - then it's the end.
+ _currentHit = null;
+ return false;
+ }
+
+ // try to scroll to the next set of hits
+ try {
+ _searchResponse = scrollSearchResponse(scrollId);
+ } catch (IOException e) {
+ logger.warn("Failed to scroll to the next search response set.", e);
+ return false;
+ }
+
+ // start over (recursively)
+ _hitIndex = 0;
+ return next();
+ }
+
+ protected abstract SearchResponse scrollSearchResponse(final String scrollId) throws IOException;
+
+ @Override
+ public Row getRow() {
+ if (_currentHit == null) {
+ return null;
+ }
+
+ final Map<String, Object> source = _currentHit.getSource();
+ final String documentId = _currentHit.getId();
+ return ElasticSearchUtils.createRow(source, documentId, getHeader());
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
index a6ce656..652fbe6 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
@@ -20,6 +20,7 @@ package org.apache.metamodel.elasticsearch.common;
import org.apache.metamodel.util.TimeComparator;
+import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -30,12 +31,22 @@ import java.util.Date;
*/
public final class ElasticSearchDateConverter {
+ private static final DateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+ private static final DateFormat FALLBACK_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
public static Date tryToConvert(String dateAsString) {
+ if (dateAsString == null) {
+ return null;
+ }
+
try {
- SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
- return dateFormat.parse(dateAsString);
+ return DEFAULT_DATE_FORMAT.parse(dateAsString);
} catch (ParseException e) {
- return TimeComparator.toDate(dateAsString);
+ try {
+ return FALLBACK_DATE_FORMAT.parse(dateAsString);
+ } catch (ParseException e1) {
+ return TimeComparator.toDate(dateAsString);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
new file mode 100644
index 0000000..32f07ff
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * Parser that transforms the ElasticSearch metadata response (json-like format)
+ * into an ElasticSearchMetaData object.
+ */
+public class ElasticSearchMetaDataParser {
+
+ /**
+ * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
+ * object. This method makes much easier to create the ElasticSearch schema.
+ *
+ * @param metaDataInfo
+ * ElasticSearch mapping metadata in Map format
+ * @return An ElasticSearchMetaData object
+ */
+ public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
+ final String[] fieldNames = new String[metaDataInfo.size() + 1];
+ final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1];
+
+ // add the document ID field (fixed)
+ fieldNames[0] = ElasticSearchUtils.FIELD_ID;
+ columnTypes[0] = ColumnType.STRING;
+
+ int i = 1;
+ for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
+ @SuppressWarnings("unchecked")
+ final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue();
+
+ fieldNames[i] = metaDataField.getKey();
+ columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
+ i++;
+
+ }
+ return new ElasticSearchMetaData(fieldNames, columnTypes);
+ }
+
+ private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) {
+ final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
+
+ if (metaDataFieldType == null) {
+ return ColumnType.STRING;
+ }
+
+ return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
+ }
+
+ private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) {
+ final Object type = metaDataField.get("type");
+ if (type == null) {
+ return null;
+ }
+ return type.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index b298d11..9128182 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -18,72 +18,43 @@
*/
package org.apache.metamodel.elasticsearch.common;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.ColumnType;
import org.apache.metamodel.schema.MutableColumn;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.util.CollectionUtils;
-import org.elasticsearch.common.base.Strings;
+import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.ExistsQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class ElasticSearchUtils {
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
-
public static final String FIELD_ID = "_id";
public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
- /**
- * Gets a "filter" query which is both 1.x and 2.x compatible.
- */
- private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
- // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
- // FilterBuilders.missingFilter(fieldName));
- // 2.x: itemQueryBuilder =
- // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
- try {
- try {
- Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
- method.setAccessible(true);
- return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
- } catch (NoSuchMethodException e) {
- Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
- "org.elasticsearch.index.query.FilterBuilders");
- Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
- filterBuilderMethod.setAccessible(true);
- Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
- QueryBuilder.class, FilterBuilder.class);
- return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
- null, fieldName));
- }
- } catch (Exception e) {
- logger.error("Failed to resolve/invoke filtering method", e);
- throw new IllegalStateException("Failed to resolve filtering method", e);
- }
- }
-
public static QueryBuilder getMissingQuery(String fieldName) {
- return getFilteredQuery("missing", fieldName);
+ return new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(fieldName));
}
public static QueryBuilder getExistsQuery(String fieldName) {
- return getFilteredQuery("exists", fieldName);
+ return new ExistsQueryBuilder(fieldName);
}
public static Map<String, ?> getMappingSource(final MutableTable table) {
@@ -170,7 +141,7 @@ public class ElasticSearchUtils {
}
if (type.isLiteral()) {
- return "string";
+ return "text";
} else if (type == ColumnType.FLOAT) {
return "float";
} else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) {
@@ -294,4 +265,35 @@ public class ElasticSearchUtils {
}
return columnType;
}
+
+ public static Row createRow(final Map<String, Object> sourceMap, final String documentId, final DataSetHeader header) {
+ final Object[] values = new Object[header.size()];
+ for (int i = 0; i < values.length; i++) {
+ final SelectItem selectItem = header.getSelectItem(i);
+ final Column column = selectItem.getColumn();
+
+ assert column != null;
+ assert selectItem.getAggregateFunction() == null;
+ assert selectItem.getScalarFunction() == null;
+
+ if (column.isPrimaryKey()) {
+ values[i] = documentId;
+ } else {
+ Object value = sourceMap.get(column.getName());
+
+ if (column.getType() == ColumnType.DATE) {
+ Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
+ if (valueToDate == null) {
+ values[i] = value;
+ } else {
+ values[i] = valueToDate;
+ }
+ } else {
+ values[i] = value;
+ }
+ }
+ }
+
+ return new DefaultRow(header, values);
+ }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
new file mode 100644
index 0000000..9fb7e03
--- /dev/null
+++ b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import junit.framework.TestCase;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+
+import java.util.*;
+
+public class ElasticSearchUtilsTest extends TestCase {
+
+ public void testAssignDocumentIdForPrimaryKeys() throws Exception {
+ MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
+ SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
+ List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
+ String documentId = "doc1";
+ DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+ Map<String, Object> values = new HashMap<>();
+ values.put("value1", "theValue");
+ Row row = ElasticSearchUtils.createRow(values, documentId, header);
+ String primaryKeyValue = (String) row.getValue(primaryKeyItem);
+
+ assertEquals(primaryKeyValue, documentId);
+ }
+
+ public void testCreateRowWithParsableDates() throws Exception {
+ SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
+ SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
+ List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
+ String documentId = "doc1";
+ DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+ Map<String, Object> values = new HashMap<>();
+ values.put("value1", "theValue");
+ values.put("value2", "2013-01-04T15:55:51.217+01:00");
+ Row row = ElasticSearchUtils.createRow(values, documentId, header);
+ Object stringValue = row.getValue(item1);
+ Object dateValue = row.getValue(item2);
+
+ assertTrue(stringValue instanceof String);
+ assertTrue(dateValue instanceof Date);
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml
index 4c1abcf..6adca36 100644
--- a/elasticsearch/native/pom.xml
+++ b/elasticsearch/native/pom.xml
@@ -43,6 +43,17 @@
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>transport</artifactId>
+ <version>${elasticsearch.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<!-- test -->
<dependency>
@@ -53,7 +64,32 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
+ <version>4.12</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch.test</groupId>
+ <artifactId>framework</artifactId>
+ <version>${elasticsearch.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.9.1</version>
+ <scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
index f27e8ac..4e5873c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -27,6 +27,7 @@ import org.apache.metamodel.schema.MutableSchema;
import org.apache.metamodel.schema.MutableTable;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.IndicesAdminClient;
@@ -50,13 +51,16 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder
final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
final String indexName = dataContext.getIndexName();
- final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
- .setType(table.getName());
+ final PutMappingRequestBuilder requestBuilder =
+ new PutMappingRequestBuilder(indicesAdmin, PutMappingAction.INSTANCE).setIndices(indexName)
+ .setType(table.getName());
requestBuilder.setSource(source);
final PutMappingResponse result = requestBuilder.execute().actionGet();
logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());
+ dataContext.getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+
final MutableSchema schema = (MutableSchema) getSchema();
schema.addTable(table);
return table;
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
index d2dfe4b..3df0ce1 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
@@ -18,39 +18,30 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
-import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.metamodel.DataContext;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.UpdateScript;
import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetHeader;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
-import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -59,14 +50,16 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectLookupContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.carrotsearch.hppc.ObjectLookupContainer;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+
/**
* DataContext implementation for ElasticSearch analytics engine.
*
@@ -84,20 +77,11 @@ import org.slf4j.LoggerFactory;
* This implementation supports either automatic discovery of a schema or manual
* specification of a schema, through the {@link SimpleTableDef} class.
*/
-public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
+public class ElasticSearchDataContext extends AbstractElasticSearchDataContext {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
- public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
-
private final Client elasticSearchClient;
- private final String indexName;
- // Table definitions that are set from the beginning, not supposed to be
- // changed.
- private final List<SimpleTableDef> staticTableDefinitions;
-
- // Table definitions that are discovered, these can change
- private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
/**
* Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
@@ -113,16 +97,12 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
* and column model of the ElasticSearch index.
*/
public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
- super(false);
+ super(indexName, tableDefinitions);
+
if (client == null) {
throw new IllegalArgumentException("ElasticSearch Client cannot be null");
}
- if (indexName == null || indexName.trim().length() == 0) {
- throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
- }
this.elasticSearchClient = client;
- this.indexName = indexName;
- this.staticTableDefinitions = Arrays.asList(tableDefinitions);
this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
}
@@ -140,40 +120,14 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
this(client, indexName, new SimpleTableDef[0]);
}
- /**
- * Performs an analysis of the available indexes in an ElasticSearch cluster
- * {@link Client} instance and detects the elasticsearch types structure
- * based on the metadata provided by the ElasticSearch java client.
- *
- * @see {@link #detectTable(ClusterState, String, String)}
- * @return a mutable schema instance, useful for further fine tuning by the
- * user.
- */
- private SimpleTableDef[] detectSchema() {
+ @Override
+ protected SimpleTableDef[] detectSchema() {
logger.info("Detecting schema for index '{}'", indexName);
- final ClusterState cs;
final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
- .prepareState();
-
- // different methods here to set the index name, so we have to use
- // reflection :-/
- try {
- final byte majorVersion = Version.CURRENT.major;
- final Object methodArgument = new String[] { indexName };
- if (majorVersion == 0) {
- final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
- method.invoke(clusterStateRequestBuilder, methodArgument);
- } else {
- final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
- method.invoke(clusterStateRequestBuilder, methodArgument);
- }
- } catch (Exception e) {
- logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
- throw new MetaModelException("Failed to create request for index information needed to detect schema", e);
- }
- cs = clusterStateRequestBuilder.execute().actionGet().getState();
-
+ .prepareState().setIndices(indexName);
+ final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState();
+
final List<SimpleTableDef> result = new ArrayList<>();
final IndexMetaData imd = cs.getMetaData().index(indexName);
@@ -183,9 +137,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
} else {
final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
final ObjectLookupContainer<String> documentTypes = mappings.keys();
-
- for (final Object documentTypeCursor : documentTypes) {
- final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString();
+ for (final ObjectCursor<?> documentTypeCursor : documentTypes) {
+ final String documentType = documentTypeCursor.value.toString();
try {
final SimpleTableDef table = detectTable(cs, indexName, documentType);
result.add(table);
@@ -194,15 +147,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
}
}
}
- final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
- Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
- @Override
- public int compare(SimpleTableDef o1, SimpleTableDef o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
-
- return tableDefArray;
+ return sortTables(result);
}
/**
@@ -225,7 +170,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
// index does not exist
throw new IllegalArgumentException("No such index: " + indexName);
}
- final MappingMetaData mappingMetaData = imd.mapping(documentType);
+ final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
+ final MappingMetaData mappingMetaData = mappings.get(documentType);
if (mappingMetaData == null) {
throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
}
@@ -244,44 +190,6 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
}
@Override
- protected Schema getMainSchema() throws MetaModelException {
- final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
- for (final SimpleTableDef tableDef : staticTableDefinitions) {
- addTable(theSchema, tableDef);
- }
-
- final SimpleTableDef[] tables = detectSchema();
- synchronized (this) {
- dynamicTableDefinitions.clear();
- dynamicTableDefinitions.addAll(Arrays.asList(tables));
- for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
- final List<String> tableNames = theSchema.getTableNames();
-
- if (!tableNames.contains(tableDef.getName())) {
- addTable(theSchema, tableDef);
- }
- }
- }
-
- return theSchema;
- }
-
- private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
- final MutableTable table = tableDef.toTable().setSchema(theSchema);
- final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
- if (idColumn != null && idColumn instanceof MutableColumn) {
- final MutableColumn mutableColumn = (MutableColumn) idColumn;
- mutableColumn.setPrimaryKey(true);
- }
- theSchema.addTable(table);
- }
-
- @Override
- protected String getMainSchemaName() throws MetaModelException {
- return indexName;
- }
-
- @Override
protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
List<FilterItem> whereItems, int firstRow, int maxRows) {
final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
@@ -290,7 +198,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
// where clause can be pushed down to an ElasticSearch query
final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
final SearchResponse response = searchRequest.execute().actionGet();
- return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false);
+ return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems);
}
return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
}
@@ -299,12 +207,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
final SearchResponse response = searchRequest.execute().actionGet();
- return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false);
+ return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new)
+ .collect(Collectors.toList()));
}
private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
final String documentType = table.getName();
- final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
+ final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType);
if (firstRow > 1) {
final int zeroBasedFrom = firstRow - 1;
searchRequest.setFrom(zeroBasedFrom);
@@ -332,7 +241,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
final String documentType = table.getName();
final String id = keyValue.toString();
- final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet();
+ final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet();
if (!response.isExists()) {
return null;
@@ -343,7 +252,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
final DataSetHeader header = new SimpleDataSetHeader(selectItems);
- return NativeElasticSearchUtils.createRow(source, documentId, header);
+ return ElasticSearchUtils.createRow(source, documentId, header);
}
@Override
@@ -353,13 +262,11 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
return null;
}
final String documentType = table.getName();
- final CountResponse response = elasticSearchClient.prepareCount(indexName)
- .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet();
- return response.getCount();
- }
-
- private boolean limitMaxRowsIsSet(int maxRows) {
- return (maxRows != -1);
+ final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType);
+ final SearchResponse searchResponse =
+ getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query))
+ .execute().actionGet();
+ return searchResponse.getHits().getTotalHits();
}
@Override
@@ -370,6 +277,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
return callback.getUpdateSummary();
}
+ @Override
+ protected void onSchemaCacheRefreshed() {
+ getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+
+ detectSchema();
+ }
+
/**
* Gets the {@link Client} that this {@link DataContext} is wrapping.
*
@@ -378,13 +292,4 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
public Client getElasticSearchClient() {
return elasticSearchClient;
}
-
- /**
- * Gets the name of the index that this {@link DataContext} is working on.
- *
- * @return
- */
- public String getIndexName() {
- return indexName;
- }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
index 94359c4..a6f6953 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
@@ -18,6 +18,9 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
import org.apache.metamodel.ConnectionException;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.factory.DataContextFactory;
@@ -27,12 +30,11 @@ import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
import org.apache.metamodel.util.SimpleTableDef;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.ImmutableSettings.Builder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Factory for ElasticSearch data context of native type.
@@ -59,6 +61,7 @@ import org.elasticsearch.node.NodeBuilder;
* </ul>
*/
public class ElasticSearchDataContextFactory implements DataContextFactory {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContextFactory.class);
@Override
public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) {
@@ -120,46 +123,22 @@ public class ElasticSearchDataContextFactory implements DataContextFactory {
@Override
public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
throws UnsupportedDataContextPropertiesException, ConnectionException {
- final String clientType = getClientType(properties);
final Client client;
- if ("node".equals(clientType)) {
- client = createNodeClient(properties);
- } else {
client = createTransportClient(properties);
- }
final String indexName = getIndex(properties);
final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
return new ElasticSearchDataContext(client, indexName, tableDefinitions);
}
private Client createTransportClient(DataContextProperties properties) {
- final Builder settingsBuilder = ImmutableSettings.builder();
- settingsBuilder.put("name", "MetaModel");
- settingsBuilder.put("cluster.name", getCluster(properties));
- if (properties.getUsername() != null && properties.getPassword() != null) {
- settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword());
- if ("true".equals(properties.toMap().get("ssl"))) {
- if (properties.toMap().get("keystorePath") != null) {
- settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath"));
- settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword"));
- }
- settingsBuilder.put("shield.transport.ssl", "true");
- }
+ final Settings settings = Settings.builder().put().put("name", "MetaModel").put("cluster.name", getCluster(properties)).build();
+ final TransportClient client = new PreBuiltTransportClient(settings);
+ try {
+ client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(properties.getHostname()), properties.getPort()));
+ } catch (UnknownHostException e) {
+ logger.warn("no IP address for the host with name \"{}\" could be found.", properties.getHostname());
}
- final Settings settings = settingsBuilder.build();
-
- final TransportClient client = new TransportClient(settings);
- client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort()));
return client;
}
- private Client createNodeClient(DataContextProperties properties) {
- final Builder settingsBuilder = ImmutableSettings.builder();
- settingsBuilder.put("name", "MetaModel");
- settingsBuilder.put("shield.enabled", false);
- final Settings settings = settingsBuilder.build();
- final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings)
- .node();
- return node.client();
- }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
index 4ced2c8..b616eb2 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
@@ -19,104 +19,38 @@
package org.apache.metamodel.elasticsearch.nativeclient;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.metamodel.data.AbstractDataSet;
import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollAction;
import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* {@link DataSet} implementation for ElasticSearch
*/
-final class ElasticSearchDataSet extends AbstractDataSet {
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class);
+final class ElasticSearchDataSet extends AbstractElasticSearchDataSet {
private final Client _client;
- private final AtomicBoolean _closed;
-
- private SearchResponse _searchResponse;
- private SearchHit _currentHit;
- private int _hitIndex = 0;
- public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems,
- boolean queryPostProcessed) {
- super(selectItems);
+ public ElasticSearchDataSet(final Client client, final SearchResponse searchResponse,
+ final List<SelectItem> selectItems) {
+ super(searchResponse, selectItems);
_client = client;
- _searchResponse = searchResponse;
- _closed = new AtomicBoolean(false);
}
-
@Override
- public void close() {
- super.close();
- boolean closeNow = _closed.compareAndSet(true, false);
- if (closeNow) {
- ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client)
- .addScrollId(_searchResponse.getScrollId());
- scrollRequestBuilder.execute();
- }
+ public void closeNow() {
+ ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client,
+ ClearScrollAction.INSTANCE).addScrollId(_searchResponse.getScrollId());
+ scrollRequestBuilder.execute();
}
@Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (!_closed.get()) {
- logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
- close();
- }
- }
-
- @Override
- public boolean next() {
- final SearchHit[] hits = _searchResponse.getHits().hits();
- if (hits.length == 0) {
- // break condition for the scroll
- _currentHit = null;
- return false;
- }
-
- if (_hitIndex < hits.length) {
- // pick the next hit within this search response
- _currentHit = hits[_hitIndex];
- _hitIndex++;
- return true;
- }
-
- final String scrollId = _searchResponse.getScrollId();
- if (scrollId == null) {
- // this search response is not scrolleable - then it's the end.
- _currentHit = null;
- return false;
- }
-
- // try to scroll to the next set of hits
- _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+ protected SearchResponse scrollSearchResponse(final String scrollId) {
+ return _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
.execute().actionGet();
-
- // start over (recursively)
- _hitIndex = 0;
- return next();
- }
-
- @Override
- public Row getRow() {
- if (_currentHit == null) {
- return null;
- }
-
- final Map<String, Object> source = _currentHit.getSource();
- final String documentId = _currentHit.getId();
- final Row row = NativeElasticSearchUtils.createRow(source, documentId, getHeader());
- return row;
}
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
index 0de2a71..2db8e8c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
+import java.util.Iterator;
import java.util.List;
import org.apache.metamodel.MetaModelException;
@@ -27,9 +28,11 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,10 +60,6 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
final Client client = dataContext.getElasticSearchClient();
final String indexName = dataContext.getIndexName();
- final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client);
- deleteByQueryRequestBuilder.setIndices(indexName);
- deleteByQueryRequestBuilder.setTypes(documentType);
-
final List<FilterItem> whereItems = getWhereItems();
// delete by query - note that creteQueryBuilderForSimpleWhere may
@@ -74,9 +73,21 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
+ whereItems);
}
- deleteByQueryRequestBuilder.setQuery(queryBuilder);
- deleteByQueryRequestBuilder.execute().actionGet();
- logger.debug("Deleted documents by query.");
+ final SearchResponse response =
+ client.prepareSearch(indexName).setQuery(queryBuilder).setTypes(documentType).execute()
+ .actionGet();
+
+ client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+ final Iterator<SearchHit> iterator = response.getHits().iterator();
+ while (iterator.hasNext()) {
+ final SearchHit hit = iterator.next();
+ final String typeId = hit.getId();
+ final DeleteResponse deleteResponse =
+ client.prepareDelete().setIndex(indexName).setType(documentType).setId(typeId).execute()
+ .actionGet();
+ logger.debug("Deleted documents by query." + deleteResponse.getResult());
+ }
+ client.admin().indices().prepareRefresh(indexName).execute().actionGet();
}
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
deleted file mode 100644
index d66b240..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.lang.reflect.Method;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class);
-
- private final ElasticSearchUpdateCallback _updateCallback;
-
- public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
- super(table);
- _updateCallback = updateCallback;
- }
-
- @Override
- public void execute() throws MetaModelException {
- final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
- final Table table = getTable();
- final String documentType = table.getName();
- logger.info("Deleting mapping / document type: {}", documentType);
- final Client client = dataContext.getElasticSearchClient();
- final IndicesAdminClient indicesAdminClient = client.admin().indices();
- final String indexName = dataContext.getIndexName();
-
- final DeleteMappingRequestBuilder requestBuilder = new DeleteMappingRequestBuilder(indicesAdminClient)
- .setIndices(indexName);
- setType(requestBuilder, documentType);
-
- final DeleteMappingResponse result = requestBuilder.execute().actionGet();
- logger.debug("Delete mapping response: acknowledged={}", result.isAcknowledged());
-
- final MutableSchema schema = (MutableSchema) table.getSchema();
- schema.removeTable(table);
- }
-
- /**
- * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} method
- * using reflection. This is done because the API of ElasticSearch was
- * changed and the method signature differes between different versions.
- *
- * @param requestBuilder
- * @param documentType
- */
- private void setType(DeleteMappingRequestBuilder requestBuilder, String documentType) {
- Object argument;
- Method method;
- try {
- try {
- method = requestBuilder.getClass().getDeclaredMethod("setType", String[].class);
- argument = new String[] {documentType};
- } catch (NoSuchMethodException e) {
- logger.debug("No setType(String[]) method found, trying with a single String instead", e);
- method = requestBuilder.getClass().getDeclaredMethod("setType", String.class);
- argument = documentType;
- }
- } catch (Exception e) {
- logger.error("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
- throw new IllegalStateException("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
- }
- try {
- method.setAccessible(true);
- method.invoke(requestBuilder, argument);
- } catch (Exception e) {
- logger.error("Failed to invoke {}", method, e);
- throw new IllegalStateException("Failed to invoke " + method, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
index 70d31b4..a1c7f69 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
@@ -26,6 +26,7 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@@ -46,7 +47,8 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast
final Client client = dataContext.getElasticSearchClient();
final String indexName = dataContext.getIndexName();
final String documentType = getTable().getName();
- final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType);
+ final IndexRequestBuilder requestBuilder =
+ new IndexRequestBuilder(client, IndexAction.INSTANCE).setIndex(indexName).setType(documentType);
final Map<String, Object> valueMap = new HashMap<>();
final Column[] columns = getColumns();
@@ -68,11 +70,11 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast
assert !valueMap.isEmpty();
requestBuilder.setSource(valueMap);
- requestBuilder.setCreate(true);
final IndexResponse result = requestBuilder.execute().actionGet();
logger.debug("Inserted document: id={}", result.getId());
- }
+ client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+ }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
deleted file mode 100644
index c0a1232..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Parser that transforms the ElasticSearch metadata response (json-like format)
- * into an ElasticSearchMetaData object.
- */
-public class ElasticSearchMetaDataParser {
-
- /**
- * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
- * object. This method makes much easier to create the ElasticSearch schema.
- *
- * @param metaDataInfo
- * ElasticSearch mapping metadata in Map format
- * @return An ElasticSearchMetaData object
- */
- public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
- final String[] fieldNames = new String[metaDataInfo.size() + 1];
- final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1];
-
- // add the document ID field (fixed)
- fieldNames[0] = ElasticSearchUtils.FIELD_ID;
- columnTypes[0] = ColumnType.STRING;
-
- int i = 1;
- for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
- @SuppressWarnings("unchecked")
- final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue();
-
- fieldNames[i] = metaDataField.getKey();
- columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
- i++;
-
- }
- return new ElasticSearchMetaData(fieldNames, columnTypes);
- }
-
- private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) {
- final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
-
- if (metaDataFieldType == null) {
- return ColumnType.STRING;
- }
-
- return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
- }
-
- private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) {
- final Object type = metaDataField.get("type");
- if (type == null) {
- return null;
- }
- return type.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
new file mode 100644
index 0000000..eeee6fc
--- /dev/null
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.AbstractRowUpdationBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateAction;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchUpdateBuilder extends AbstractRowUpdationBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUpdateBuilder.class);
+
+ private final ElasticSearchUpdateCallback _updateCallback;
+
+ public ElasticSearchUpdateBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
+ super(table);
+ _updateCallback = updateCallback;
+ }
+
+ @Override
+ public void execute() throws MetaModelException {
+
+ final Table table = getTable();
+ final String documentType = table.getName();
+
+ final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
+ final Client client = dataContext.getElasticSearchClient();
+ final String indexName = dataContext.getIndexName();
+ final List<FilterItem> whereItems = getWhereItems();
+
+ // delete by query - note that creteQueryBuilderForSimpleWhere may
+ // return matchAllQuery() if no where items are present.
+ final QueryBuilder queryBuilder =
+ ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND);
+ if (queryBuilder == null) {
+ // TODO: The where items could not be pushed down to a query. We
+ // could solve this by running a query first, gather all
+ // document IDs and then delete by IDs.
+ throw new UnsupportedOperationException(
+ "Could not push down WHERE items to delete by query request: " + whereItems);
+ }
+
+ final SearchResponse response = client.prepareSearch(indexName).setQuery(queryBuilder).execute().actionGet();
+
+ final Iterator<SearchHit> iterator = response.getHits().iterator();
+ while (iterator.hasNext()) {
+ final SearchHit hit = iterator.next();
+ final String typeId = hit.getId();
+
+ final UpdateRequestBuilder requestBuilder =
+ new UpdateRequestBuilder(client, UpdateAction.INSTANCE).setIndex(indexName).setType(documentType)
+ .setId(typeId);
+
+ final Map<String, Object> valueMap = new HashMap<>();
+ final Column[] columns = getColumns();
+ final Object[] values = getValues();
+ for (int i = 0; i < columns.length; i++) {
+ if (isSet(columns[i])) {
+ final String name = columns[i].getName();
+ final Object value = values[i];
+ if (ElasticSearchUtils.FIELD_ID.equals(name)) {
+ if (value != null) {
+ requestBuilder.setId(value.toString());
+ }
+ } else {
+ valueMap.put(name, value);
+ }
+ }
+ }
+
+ assert !valueMap.isEmpty();
+
+ requestBuilder.setDoc(valueMap);
+
+ final UpdateResponse updateResponse = requestBuilder.execute().actionGet();
+
+ logger.debug("Update document: id={}", updateResponse.getId());
+
+ client.admin().indices().prepareRefresh(indexName).get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
index b81c9c7..c4cbbac 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
@@ -26,6 +26,7 @@ import org.apache.metamodel.drop.TableDropBuilder;
import org.apache.metamodel.insert.RowInsertionBuilder;
import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.RowUpdationBuilder;
import org.elasticsearch.client.Client;
/**
@@ -50,13 +51,13 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
@Override
public boolean isDropTableSupported() {
- return true;
+ return false;
}
@Override
public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
UnsupportedOperationException {
- return new ElasticSearchDropTableBuilder(this, table);
+ throw new UnsupportedOperationException();
}
@Override
@@ -76,6 +77,11 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
return new ElasticSearchDeleteBuilder(this, table);
}
+ @Override
+ public RowUpdationBuilder update(final Table table) {
+ return new ElasticSearchUpdateBuilder(this, table);
+ }
+
public void onExecuteUpdateFinished() {
// force refresh of the index
final ElasticSearchDataContext dataContext = getDataContext();
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
deleted file mode 100644
index 822ef1b..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class NativeElasticSearchUtils {
-
- public static Row createRow(Map<String, Object> sourceMap, String documentId, DataSetHeader header) {
- final Object[] values = new Object[header.size()];
- for (int i = 0; i < values.length; i++) {
- final SelectItem selectItem = header.getSelectItem(i);
- final Column column = selectItem.getColumn();
-
- assert column != null;
- assert selectItem.getAggregateFunction() == null;
- assert selectItem.getScalarFunction() == null;
-
- if (column.isPrimaryKey()) {
- values[i] = documentId;
- } else {
- Object value = sourceMap.get(column.getName());
-
- if (column.getType() == ColumnType.DATE) {
- Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
- if (valueToDate == null) {
- values[i] = value;
- } else {
- values[i] = valueToDate;
- }
- } else {
- values[i] = value;
- }
- }
- }
-
- return new DefaultRow(header, values);
- }
-}