You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2016/05/13 15:48:31 UTC
metamodel git commit: METAMODEL-1082: Fixed
Repository: metamodel
Updated Branches:
refs/heads/master 152f8487a -> 2e39b5010
METAMODEL-1082: Fixed
Closes #98
Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/2e39b501
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/2e39b501
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/2e39b501
Branch: refs/heads/master
Commit: 2e39b501091e530ffc39a32767a9e650c1fdcab8
Parents: 152f848
Author: Kasper S�rensen <i....@gmail.com>
Authored: Fri May 13 08:48:23 2016 -0700
Committer: Kasper S�rensen <i....@gmail.com>
Committed: Fri May 13 08:48:23 2016 -0700
----------------------------------------------------------------------
CHANGES.md | 1 +
.../common/ElasticSearchUtils.java | 91 +++++--
.../ElasticSearchCreateTableBuilder.java | 10 +-
.../nativeclient/NativeElasticSearchUtils.java | 3 +-
elasticsearch/rest/pom.xml | 2 +-
.../rest/ElasticSearchRestDataContext.java | 4 +-
.../JestElasticSearchCreateTableBuilder.java | 20 +-
.../rest/JestElasticSearchDataSet.java | 2 +-
.../rest/JestElasticSearchDeleteBuilder.java | 2 +-
.../rest/JestElasticSearchDropTableBuilder.java | 5 +-
.../rest/JestElasticSearchInsertBuilder.java | 33 ++-
.../rest/JestElasticSearchUpdateCallback.java | 249 ++++++++++++-------
12 files changed, 274 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index 25a7ed4..66d1917 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@
* [METAMODEL-247] - Added FixedWidthConfigurationReader for reading fixed width file metadata from external files.
* [METAMODEL-159] - DataContextFactory misses methods to create HBase and POJO data contexts.
* [METAMODEL-252] - Fixed a bug that caused JDBC updates to unnecessarily refresh schema objects.
+ * [METAMODEL-1082] - Improved performance of batch ElasticSearch operations by using bulk API.
### Apache MetaModel 4.5.2
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index 11d35bd..b298d11 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -21,7 +21,10 @@ package org.apache.metamodel.elasticsearch.common;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
@@ -40,29 +43,34 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ElasticSearchUtils {
- public static final String FIELD_ID = "_id";
+
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
+ public static final String FIELD_ID = "_id";
+ public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
+
/**
* Gets a "filter" query which is both 1.x and 2.x compatible.
*/
private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
- // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(fieldName));
- // 2.x: itemQueryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
+ // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
+ // FilterBuilders.missingFilter(fieldName));
+ // 2.x: itemQueryBuilder =
+ // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
try {
try {
Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
method.setAccessible(true);
return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
} catch (NoSuchMethodException e) {
- Class<?> clazz = ElasticSearchUtils.class.getClassLoader()
- .loadClass("org.elasticsearch.index.query.FilterBuilders");
+ Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
+ "org.elasticsearch.index.query.FilterBuilders");
Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
filterBuilderMethod.setAccessible(true);
- Method queryBuildersFilteredQueryMethod =
- QueryBuilders.class.getDeclaredMethod("filteredQuery", QueryBuilder.class, FilterBuilder.class);
- return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null,
- filterBuilderMethod.invoke(null, fieldName));
+ Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
+ QueryBuilder.class, FilterBuilder.class);
+ return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
+ null, fieldName));
}
} catch (Exception e) {
logger.error("Failed to resolve/invoke filtering method", e);
@@ -78,34 +86,63 @@ public class ElasticSearchUtils {
return getFilteredQuery("exists", fieldName);
}
- public static List<Object> getSourceProperties(final MutableTable table) {
+ public static Map<String, ?> getMappingSource(final MutableTable table) {
if (table.getColumnByName(FIELD_ID) == null) {
- final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING)
- .setTable(table).setPrimaryKey(true);
+ final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING).setTable(table).setPrimaryKey(
+ true);
table.addColumn(0, idColumn);
}
- final List<Object> sourceProperties = new ArrayList<>();
-
+ final Map<String, Map<String, String>> propertiesMap = new LinkedHashMap<>();
+
for (Column column : table.getColumns()) {
- // each column is defined as a property pair of the form: ("field1",
- // "type=string,store=true")
final String columnName = column.getName();
if (FIELD_ID.equals(columnName)) {
// do nothing - the ID is a client-side construct
continue;
}
- sourceProperties.add(columnName);
+
+ final String fieldName = getValidatedFieldName(columnName);
+ final Map<String, String> propertyMap = new HashMap<>();
+ final String type = getType(column);
+ propertyMap.put("type", type);
+
+ propertiesMap.put(fieldName, propertyMap);
+ }
- String type = getType(column);
- if (type == null) {
- sourceProperties.add("store=true");
+ HashMap<String, Map<String, Map<String, String>>> docTypeMap = new HashMap<>();
+ docTypeMap.put("properties", propertiesMap);
+
+ final Map<String, Map<String, Map<String, Map<String, String>>>> mapping = new HashMap<>();
+ mapping.put(table.getName(), docTypeMap);
+ return mapping;
+ }
+
+ /**
+ * Field name special characters are:
+ *
+ * . (used for navigation between name components)
+ *
+ * # (for delimiting name components in _uid, should work, but is
+ * discouraged)
+ *
+ * * (for matching names)
+ *
+ * @param fieldName
+ * @return
+ */
+ public static String getValidatedFieldName(String fieldName) {
+ if (fieldName == null || fieldName.isEmpty()) {
+ throw new IllegalArgumentException("Field name cannot be null or empty");
+ }
+ if (fieldName.contains(".") || fieldName.contains("#") || fieldName.contains("*")) {
+ if ("true".equalsIgnoreCase(System.getProperty(SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS, "true"))) {
+ fieldName = fieldName.replace('.', '_').replace('#', '_').replace('*', '_');
} else {
- sourceProperties.add("type=" + type + ",store=true");
+ throw new IllegalArgumentException("Field name '" + fieldName + "' contains illegal character (.#*)");
}
}
-
- return sourceProperties;
+ return fieldName;
}
/**
@@ -154,8 +191,8 @@ public class ElasticSearchUtils {
return "object";
}
- throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '"
- + column.getName() + "' - cannot translate to an ElasticSearch type.");
+ throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '" + column
+ .getName() + "' - cannot translate to an ElasticSearch type.");
}
/**
@@ -204,8 +241,8 @@ public class ElasticSearchUtils {
if (operand == null) {
itemQueryBuilder = getExistsQuery(fieldName);
} else {
- itemQueryBuilder = QueryBuilders.boolQuery().mustNot(
- QueryBuilders.termQuery(fieldName, operand));
+ itemQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(fieldName,
+ operand));
}
} else if (OperatorType.IN.equals(operator)) {
final List<?> operands = CollectionUtils.toList(operand);
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
index 5d6701c..f27e8ac 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -18,7 +18,7 @@
*/
package org.apache.metamodel.elasticsearch.nativeclient;
-import java.util.List;
+import java.util.Map;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.create.AbstractTableCreationBuilder;
@@ -44,15 +44,15 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder
@Override
public Table execute() throws MetaModelException {
final MutableTable table = getTable();
- final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table);
+ final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext();
final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
final String indexName = dataContext.getIndexName();
- final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin)
- .setIndices(indexName).setType(table.getName());
- requestBuilder.setSource(sourceProperties.toArray());
+ final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
+ .setType(table.getName());
+ requestBuilder.setSource(source);
final PutMappingResponse result = requestBuilder.execute().actionGet();
logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
index 1efb0c8..822ef1b 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
@@ -41,7 +41,8 @@ final class NativeElasticSearchUtils {
final Column column = selectItem.getColumn();
assert column != null;
- assert selectItem.getFunction() == null;
+ assert selectItem.getAggregateFunction() == null;
+ assert selectItem.getScalarFunction() == null;
if (column.isPrimaryKey()) {
values[i] = documentId;
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
index 16c0555..92f7393 100644
--- a/elasticsearch/rest/pom.xml
+++ b/elasticsearch/rest/pom.xml
@@ -27,7 +27,7 @@ under the License.
<modelVersion>4.0.0</modelVersion>
<properties>
- <jest.version>0.1.7</jest.version>
+ <jest.version>2.0.2</jest.version>
<elasticsearch.version>1.4.4</elasticsearch.version>
</properties>
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
index 6b8ac51..c452d7b 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.metamodel.BatchUpdateScript;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.QueryPostprocessDataContext;
@@ -354,7 +355,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
@Override
public void executeUpdate(UpdateScript update) {
- final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this);
+ final boolean isBatch = update instanceof BatchUpdateScript;
+ final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
update.run(callback);
callback.onExecuteUpdateFinished();
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
index cc26c8d..3e71c4d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
@@ -18,29 +18,35 @@
*/
package org.apache.metamodel.elasticsearch.rest;
-import io.searchbox.indices.mapping.PutMapping;
+import java.util.Map;
+
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.create.AbstractTableCreationBuilder;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.*;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
-import java.util.List;
+import io.searchbox.indices.mapping.PutMapping;
final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
- public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema, String name) {
+
+ public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema,
+ String name) {
super(updateCallback, schema, name);
}
@Override
public Table execute() throws MetaModelException {
final MutableTable table = getTable();
- final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table);
+ final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
final String indexName = dataContext.getIndexName();
- final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), sourceProperties).build();
- JestClientExecutor.execute(dataContext.getElasticSearchClient(), putMapping);
+ final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), source).build();
+ getUpdateCallback().execute(putMapping);
final MutableSchema schema = (MutableSchema) getSchema();
schema.addTable(table);
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
index 9678b48..7f485ba 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
@@ -106,7 +106,7 @@ final class JestElasticSearchDataSet extends AbstractDataSet {
}
// try to scroll to the next set of hits
- SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
+ final SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
_searchResponse = JestClientExecutor.execute(_client, scroll);
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
index a4c0c03..cc1c3e7 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
@@ -71,6 +71,6 @@ final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
documentType).build();
- JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteByQuery);
+ _updateCallback.execute(deleteByQuery);
}
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
index d4ddd19..8a1ac71 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
@@ -18,7 +18,6 @@
*/
package org.apache.metamodel.elasticsearch.rest;
-import io.searchbox.indices.mapping.DeleteMapping;
import org.apache.metamodel.MetaModelException;
import org.apache.metamodel.drop.AbstractTableDropBuilder;
import org.apache.metamodel.drop.TableDropBuilder;
@@ -27,6 +26,8 @@ import org.apache.metamodel.schema.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.searchbox.indices.mapping.DeleteMapping;
+
/**
* {@link TableDropBuilder} for dropping tables (document types) in an
* ElasticSearch index.
@@ -52,7 +53,7 @@ final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
- JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteIndex);
+ _updateCallback.execute(deleteIndex);
final MutableSchema schema = (MutableSchema) table.getSchema();
schema.removeTable(table);
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
index 327d7d3..746538d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
@@ -18,60 +18,57 @@
*/
package org.apache.metamodel.elasticsearch.rest;
-import io.searchbox.core.DocumentResult;
-import io.searchbox.core.Index;
-import io.searchbox.params.Parameters;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
import org.apache.metamodel.schema.Column;
import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
+import io.searchbox.core.Index;
+import io.searchbox.params.Parameters;
final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
- private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchInsertBuilder.class);
-
public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
super(updateCallback, table);
}
@Override
public void execute() throws MetaModelException {
- final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
+ final JestElasticSearchUpdateCallback updateCallback = getUpdateCallback();
+ final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
final String indexName = dataContext.getIndexName();
final String documentType = getTable().getName();
-
final Map<String, Object> source = new HashMap<>();
final Column[] columns = getColumns();
final Object[] values = getValues();
String id = null;
for (int i = 0; i < columns.length; i++) {
if (isSet(columns[i])) {
- final String name = columns[i].getName();
+ final String columnName = columns[i].getName();
+
final Object value = values[i];
- if (ElasticSearchRestDataContext.FIELD_ID.equals(name)) {
+ if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) {
if (value != null) {
id = value.toString();
}
} else {
- source.put(name, value);
+ final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
+ source.put(fieldName, value);
}
}
}
assert !source.isEmpty();
- Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
+ final Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
Parameters.OP_TYPE, "create").build();
- final DocumentResult result = JestClientExecutor.execute(dataContext.getElasticSearchClient(), index);
-
- logger.debug("Inserted document: id={}", result.getId());
+ getUpdateCallback().execute(index);
}
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
index ca2ed13..521955d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
@@ -1,84 +1,165 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.indices.Refresh;
-import org.apache.metamodel.AbstractUpdateCallback;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.create.TableCreationBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.insert.RowInsertionBuilder;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-
-/**
- * {@link UpdateCallback} implementation for {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
- public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext) {
- super(dataContext);
- }
-
- @Override
- public ElasticSearchRestDataContext getDataContext() {
- return (ElasticSearchRestDataContext) super.getDataContext();
- }
-
- @Override
- public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
- IllegalStateException {
- return new JestElasticSearchCreateTableBuilder(this, schema, name);
- }
-
- @Override
- public boolean isDropTableSupported() {
- return true;
- }
-
- @Override
- public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
- UnsupportedOperationException {
- return new JestElasticSearchDropTableBuilder(this, table);
- }
-
- @Override
- public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
- UnsupportedOperationException {
- return new JestElasticSearchInsertBuilder(this, table);
- }
-
- @Override
- public boolean isDeleteSupported() {
- return true;
- }
-
- @Override
- public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
- UnsupportedOperationException {
- return new JestElasticSearchDeleteBuilder(this, table);
- }
-
- public void onExecuteUpdateFinished() {
- final String indexName = getDataContext().getIndexName();
- Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
-
- JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
- }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.List;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.searchbox.action.Action;
+import io.searchbox.action.BulkableAction;
+import io.searchbox.client.JestResult;
+import io.searchbox.core.Bulk;
+import io.searchbox.core.Bulk.Builder;
+import io.searchbox.core.BulkResult;
+import io.searchbox.core.BulkResult.BulkResultItem;
+import io.searchbox.indices.Refresh;
+
+/**
+ * {@link UpdateCallback} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
+
+ private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);
+
+ private static final int BULK_BUFFER_SIZE = 1000;
+
+ private Bulk.Builder bulkBuilder;
+ private int bulkActionCount = 0;
+ private final boolean isBatch;
+
+ public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) {
+ super(dataContext);
+ this.isBatch = isBatch;
+ }
+
+ private boolean isBatch() {
+ return isBatch;
+ }
+
+ @Override
+ public ElasticSearchRestDataContext getDataContext() {
+ return (ElasticSearchRestDataContext) super.getDataContext();
+ }
+
+ @Override
+ public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
+ IllegalStateException {
+ return new JestElasticSearchCreateTableBuilder(this, schema, name);
+ }
+
+ @Override
+ public boolean isDropTableSupported() {
+ return true;
+ }
+
+ @Override
+ public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new JestElasticSearchDropTableBuilder(this, table);
+ }
+
+ @Override
+ public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new JestElasticSearchInsertBuilder(this, table);
+ }
+
+ @Override
+ public boolean isDeleteSupported() {
+ return true;
+ }
+
+ @Override
+ public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new JestElasticSearchDeleteBuilder(this, table);
+ }
+
+ public void onExecuteUpdateFinished() {
+ if (isBatch()) {
+ flushBulkActions();
+ }
+
+ final String indexName = getDataContext().getIndexName();
+ final Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
+
+ JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
+ }
+
+ private void flushBulkActions() {
+ if (bulkBuilder == null || bulkActionCount == 0) {
+ // nothing to flush
+ return;
+ }
+ final Bulk bulk = getBulkBuilder().build();
+ logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
+ executeBlocking(bulk);
+
+ bulkActionCount = 0;
+ bulkBuilder = null;
+ }
+
+ public void execute(Action<?> action) {
+ if (isBatch() && action instanceof BulkableAction) {
+ final Bulk.Builder bulkBuilder = getBulkBuilder();
+ bulkBuilder.addAction((BulkableAction<?>) action);
+ bulkActionCount++;
+ if (bulkActionCount == BULK_BUFFER_SIZE) {
+ flushBulkActions();
+ }
+ } else {
+ executeBlocking(action);
+ }
+ }
+
+ private void executeBlocking(Action<?> action) {
+ final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);
+ if (!result.isSucceeded()) {
+ if (result instanceof BulkResult) {
+ final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems();
+ for (int i = 0; i < failedItems.size(); i++) {
+ final BulkResultItem failedItem = failedItems.get(i);
+ logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error);
+ }
+ }
+ throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage());
+ }
+ }
+
+ private Builder getBulkBuilder() {
+ if (bulkBuilder == null) {
+ bulkBuilder = new Bulk.Builder();
+ bulkBuilder.defaultIndex(getDataContext().getIndexName());
+ }
+ return bulkBuilder;
+ }
+}