You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2016/05/13 15:48:31 UTC

metamodel git commit: METAMODEL-1082: Fixed

Repository: metamodel
Updated Branches:
  refs/heads/master 152f8487a -> 2e39b5010


METAMODEL-1082: Fixed

Closes #98

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/2e39b501
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/2e39b501
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/2e39b501

Branch: refs/heads/master
Commit: 2e39b501091e530ffc39a32767a9e650c1fdcab8
Parents: 152f848
Author: Kasper S�rensen <i....@gmail.com>
Authored: Fri May 13 08:48:23 2016 -0700
Committer: Kasper S�rensen <i....@gmail.com>
Committed: Fri May 13 08:48:23 2016 -0700

----------------------------------------------------------------------
 CHANGES.md                                      |   1 +
 .../common/ElasticSearchUtils.java              |  91 +++++--
 .../ElasticSearchCreateTableBuilder.java        |  10 +-
 .../nativeclient/NativeElasticSearchUtils.java  |   3 +-
 elasticsearch/rest/pom.xml                      |   2 +-
 .../rest/ElasticSearchRestDataContext.java      |   4 +-
 .../JestElasticSearchCreateTableBuilder.java    |  20 +-
 .../rest/JestElasticSearchDataSet.java          |   2 +-
 .../rest/JestElasticSearchDeleteBuilder.java    |   2 +-
 .../rest/JestElasticSearchDropTableBuilder.java |   5 +-
 .../rest/JestElasticSearchInsertBuilder.java    |  33 ++-
 .../rest/JestElasticSearchUpdateCallback.java   | 249 ++++++++++++-------
 12 files changed, 274 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index 25a7ed4..66d1917 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -7,6 +7,7 @@
  * [METAMODEL-247] - Added FixedWidthConfigurationReader for reading fixed width file metadata from external files.
  * [METAMODEL-159] - DataContextFactory misses methods to create HBase and POJO data contexts.
  * [METAMODEL-252] - Fixed a bug that caused JDBC updates to unnecessarily refresh schema objects.
+ * [METAMODEL-1082] - Improved performance of batch ElasticSearch operations by using bulk API.
 
 ### Apache MetaModel 4.5.2
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index 11d35bd..b298d11 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -21,7 +21,10 @@ package org.apache.metamodel.elasticsearch.common;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
@@ -40,29 +43,34 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ElasticSearchUtils {
-    public static final String FIELD_ID = "_id";
+
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
 
+    public static final String FIELD_ID = "_id";
+    public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
+
     /**
      * Gets a "filter" query which is both 1.x and 2.x compatible.
      */
     private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
-        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(fieldName));
-        // 2.x: itemQueryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
+        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
+        // FilterBuilders.missingFilter(fieldName));
+        // 2.x: itemQueryBuilder =
+        // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
         try {
             try {
                 Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
                 method.setAccessible(true);
                 return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
             } catch (NoSuchMethodException e) {
-                Class<?> clazz = ElasticSearchUtils.class.getClassLoader()
-                        .loadClass("org.elasticsearch.index.query.FilterBuilders");
+                Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
+                        "org.elasticsearch.index.query.FilterBuilders");
                 Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
                 filterBuilderMethod.setAccessible(true);
-                Method queryBuildersFilteredQueryMethod =
-                        QueryBuilders.class.getDeclaredMethod("filteredQuery", QueryBuilder.class, FilterBuilder.class);
-                return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null,
-                        filterBuilderMethod.invoke(null, fieldName));
+                Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
+                        QueryBuilder.class, FilterBuilder.class);
+                return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
+                        null, fieldName));
             }
         } catch (Exception e) {
             logger.error("Failed to resolve/invoke filtering method", e);
@@ -78,34 +86,63 @@ public class ElasticSearchUtils {
         return getFilteredQuery("exists", fieldName);
     }
 
-    public static List<Object> getSourceProperties(final MutableTable table) {
+    public static Map<String, ?> getMappingSource(final MutableTable table) {
         if (table.getColumnByName(FIELD_ID) == null) {
-            final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING)
-                    .setTable(table).setPrimaryKey(true);
+            final MutableColumn idColumn = new MutableColumn(FIELD_ID, ColumnType.STRING).setTable(table).setPrimaryKey(
+                    true);
             table.addColumn(0, idColumn);
         }
 
-        final List<Object> sourceProperties = new ArrayList<>();
-
+        final Map<String, Map<String, String>> propertiesMap = new LinkedHashMap<>();
+        
         for (Column column : table.getColumns()) {
-            // each column is defined as a property pair of the form: ("field1",
-            // "type=string,store=true")
             final String columnName = column.getName();
             if (FIELD_ID.equals(columnName)) {
                 // do nothing - the ID is a client-side construct
                 continue;
             }
-            sourceProperties.add(columnName);
+            
+            final String fieldName = getValidatedFieldName(columnName);
+            final Map<String, String> propertyMap = new HashMap<>();
+            final String type = getType(column);
+            propertyMap.put("type", type);
+            
+            propertiesMap.put(fieldName, propertyMap);
+        }
 
-            String type = getType(column);
-            if (type == null) {
-                sourceProperties.add("store=true");
+        HashMap<String, Map<String, Map<String, String>>> docTypeMap = new HashMap<>();
+        docTypeMap.put("properties", propertiesMap);
+        
+        final Map<String, Map<String, Map<String, Map<String, String>>>> mapping = new HashMap<>();
+        mapping.put(table.getName(), docTypeMap);
+        return mapping;
+    }
+
+    /**
+     * Field name special characters are:
+     * 
+     * . (used for navigation between name components)
+     * 
+     * # (for delimiting name components in _uid, should work, but is
+     * discouraged)
+     * 
+     * * (for matching names)
+     * 
+     * @param fieldName
+     * @return
+     */
+    public static String getValidatedFieldName(String fieldName) {
+        if (fieldName == null || fieldName.isEmpty()) {
+            throw new IllegalArgumentException("Field name cannot be null or empty");
+        }
+        if (fieldName.contains(".") || fieldName.contains("#") || fieldName.contains("*")) {
+            if ("true".equalsIgnoreCase(System.getProperty(SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS, "true"))) {
+                fieldName = fieldName.replace('.', '_').replace('#', '_').replace('*', '_');
             } else {
-                sourceProperties.add("type=" + type + ",store=true");
+                throw new IllegalArgumentException("Field name '" + fieldName + "' contains illegal character (.#*)");
             }
         }
-
-        return sourceProperties;
+        return fieldName;
     }
 
     /**
@@ -154,8 +191,8 @@ public class ElasticSearchUtils {
             return "object";
         }
 
-        throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '"
-                + column.getName() + "' - cannot translate to an ElasticSearch type.");
+        throw new UnsupportedOperationException("Unsupported column type '" + type.getName() + "' of column '" + column
+                .getName() + "' - cannot translate to an ElasticSearch type.");
     }
 
     /**
@@ -204,8 +241,8 @@ public class ElasticSearchUtils {
                     if (operand == null) {
                         itemQueryBuilder = getExistsQuery(fieldName);
                     } else {
-                        itemQueryBuilder = QueryBuilders.boolQuery().mustNot(
-                                QueryBuilders.termQuery(fieldName, operand));
+                        itemQueryBuilder = QueryBuilders.boolQuery().mustNot(QueryBuilders.termQuery(fieldName,
+                                operand));
                     }
                 } else if (OperatorType.IN.equals(operator)) {
                     final List<?> operands = CollectionUtils.toList(operand);

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
index 5d6701c..f27e8ac 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -18,7 +18,7 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
-import java.util.List;
+import java.util.Map;
 
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.create.AbstractTableCreationBuilder;
@@ -44,15 +44,15 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder
     @Override
     public Table execute() throws MetaModelException {
         final MutableTable table = getTable();
-        final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table);
+        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
 
         final ElasticSearchDataContext dataContext = getUpdateCallback().getDataContext();
         final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
         final String indexName = dataContext.getIndexName();
 
-        final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin)
-                .setIndices(indexName).setType(table.getName());
-        requestBuilder.setSource(sourceProperties.toArray());
+        final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
+                .setType(table.getName());
+        requestBuilder.setSource(source);
         final PutMappingResponse result = requestBuilder.execute().actionGet();
 
         logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
index 1efb0c8..822ef1b 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
@@ -41,7 +41,8 @@ final class NativeElasticSearchUtils {
             final Column column = selectItem.getColumn();
 
             assert column != null;
-            assert selectItem.getFunction() == null;
+            assert selectItem.getAggregateFunction() == null;
+            assert selectItem.getScalarFunction() == null;
 
             if (column.isPrimaryKey()) {
                 values[i] = documentId;

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
index 16c0555..92f7393 100644
--- a/elasticsearch/rest/pom.xml
+++ b/elasticsearch/rest/pom.xml
@@ -27,7 +27,7 @@ under the License.
 	<modelVersion>4.0.0</modelVersion>
 
 	<properties>
-		<jest.version>0.1.7</jest.version>
+		<jest.version>2.0.2</jest.version>
 		<elasticsearch.version>1.4.4</elasticsearch.version>
 	</properties>
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
index 6b8ac51..c452d7b 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.metamodel.BatchUpdateScript;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.QueryPostprocessDataContext;
@@ -354,7 +355,8 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
 
     @Override
     public void executeUpdate(UpdateScript update) {
-        final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this);
+        final boolean isBatch = update instanceof BatchUpdateScript;
+        final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
         update.run(callback);
         callback.onExecuteUpdateFinished();
     }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
index cc26c8d..3e71c4d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
@@ -18,29 +18,35 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
-import io.searchbox.indices.mapping.PutMapping;
+import java.util.Map;
+
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.create.AbstractTableCreationBuilder;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.*;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
 
-import java.util.List;
+import io.searchbox.indices.mapping.PutMapping;
 
 final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
-    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema, String name) {
+    
+    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema,
+            String name) {
         super(updateCallback, schema, name);
     }
 
     @Override
     public Table execute() throws MetaModelException {
         final MutableTable table = getTable();
-        final List<Object> sourceProperties = ElasticSearchUtils.getSourceProperties(table);
+        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
 
         final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
         final String indexName = dataContext.getIndexName();
 
-        final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), sourceProperties).build();
-        JestClientExecutor.execute(dataContext.getElasticSearchClient(), putMapping);
+        final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), source).build();
+        getUpdateCallback().execute(putMapping);
 
         final MutableSchema schema = (MutableSchema) getSchema();
         schema.addTable(table);

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
index 9678b48..7f485ba 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
@@ -106,7 +106,7 @@ final class JestElasticSearchDataSet extends AbstractDataSet {
         }
 
         // try to scroll to the next set of hits
-        SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
+        final SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
 
         _searchResponse = JestClientExecutor.execute(_client, scroll);
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
index a4c0c03..cc1c3e7 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
@@ -71,6 +71,6 @@ final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
                 new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
                         documentType).build();
 
-        JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteByQuery);
+        _updateCallback.execute(deleteByQuery);
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
index d4ddd19..8a1ac71 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
@@ -18,7 +18,6 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
-import io.searchbox.indices.mapping.DeleteMapping;
 import org.apache.metamodel.MetaModelException;
 import org.apache.metamodel.drop.AbstractTableDropBuilder;
 import org.apache.metamodel.drop.TableDropBuilder;
@@ -27,6 +26,8 @@ import org.apache.metamodel.schema.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.searchbox.indices.mapping.DeleteMapping;
+
 /**
  * {@link TableDropBuilder} for dropping tables (document types) in an
  * ElasticSearch index.
@@ -52,7 +53,7 @@ final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
 
         final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
 
-        JestClientExecutor.execute(dataContext.getElasticSearchClient(), deleteIndex);
+        _updateCallback.execute(deleteIndex);
 
         final MutableSchema schema = (MutableSchema) table.getSchema();
         schema.removeTable(table);

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
index 327d7d3..746538d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
@@ -18,60 +18,57 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
-import io.searchbox.core.DocumentResult;
-import io.searchbox.core.Index;
-import io.searchbox.params.Parameters;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.Map;
+import io.searchbox.core.Index;
+import io.searchbox.params.Parameters;
 
 final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
 
-    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchInsertBuilder.class);
-
     public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
         super(updateCallback, table);
     }
 
     @Override
     public void execute() throws MetaModelException {
-        final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
+        final JestElasticSearchUpdateCallback updateCallback = getUpdateCallback();
+        final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
         final String indexName = dataContext.getIndexName();
         final String documentType = getTable().getName();
 
-
         final Map<String, Object> source = new HashMap<>();
         final Column[] columns = getColumns();
         final Object[] values = getValues();
         String id = null;
         for (int i = 0; i < columns.length; i++) {
             if (isSet(columns[i])) {
-                final String name = columns[i].getName();
+                final String columnName = columns[i].getName();
+
                 final Object value = values[i];
-                if (ElasticSearchRestDataContext.FIELD_ID.equals(name)) {
+                if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) {
                     if (value != null) {
                         id = value.toString();
                     }
                 } else {
-                    source.put(name, value);
+                    final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
+                    source.put(fieldName, value);
                 }
             }
         }
 
         assert !source.isEmpty();
 
-        Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
+        final Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
                 Parameters.OP_TYPE, "create").build();
 
-        final DocumentResult result = JestClientExecutor.execute(dataContext.getElasticSearchClient(), index);
-
-        logger.debug("Inserted document: id={}", result.getId());
+        getUpdateCallback().execute(index);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/2e39b501/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
index ca2ed13..521955d 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
@@ -1,84 +1,165 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.indices.Refresh;
-import org.apache.metamodel.AbstractUpdateCallback;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.create.TableCreationBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.insert.RowInsertionBuilder;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-
-/**
- * {@link UpdateCallback} implementation for {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
-    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext) {
-        super(dataContext);
-    }
-
-    @Override
-    public ElasticSearchRestDataContext getDataContext() {
-        return (ElasticSearchRestDataContext) super.getDataContext();
-    }
-
-    @Override
-    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
-            IllegalStateException {
-        return new JestElasticSearchCreateTableBuilder(this, schema, name);
-    }
-
-    @Override
-    public boolean isDropTableSupported() {
-        return true;
-    }
-
-    @Override
-    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDropTableBuilder(this, table);
-    }
-
-    @Override
-    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchInsertBuilder(this, table);
-    }
-
-    @Override
-    public boolean isDeleteSupported() {
-        return true;
-    }
-
-    @Override
-    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDeleteBuilder(this, table);
-    }
-
-    public void onExecuteUpdateFinished() {
-        final String indexName = getDataContext().getIndexName();
-        Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
-
-        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
-    }
-}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.List;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.searchbox.action.Action;
+import io.searchbox.action.BulkableAction;
+import io.searchbox.client.JestResult;
+import io.searchbox.core.Bulk;
+import io.searchbox.core.Bulk.Builder;
+import io.searchbox.core.BulkResult;
+import io.searchbox.core.BulkResult.BulkResultItem;
+import io.searchbox.indices.Refresh;
+
+/**
+ * {@link UpdateCallback} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
+
+    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);
+
+    private static final int BULK_BUFFER_SIZE = 1000;
+
+    private Bulk.Builder bulkBuilder;
+    private int bulkActionCount = 0;
+    private final boolean isBatch;
+
+    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) {
+        super(dataContext);
+        this.isBatch = isBatch;
+    }
+
+    private boolean isBatch() {
+        return isBatch;
+    }
+
+    @Override
+    public ElasticSearchRestDataContext getDataContext() {
+        return (ElasticSearchRestDataContext) super.getDataContext();
+    }
+
+    @Override
+    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
+            IllegalStateException {
+        return new JestElasticSearchCreateTableBuilder(this, schema, name);
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return true;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new JestElasticSearchDropTableBuilder(this, table);
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new JestElasticSearchInsertBuilder(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return true;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new JestElasticSearchDeleteBuilder(this, table);
+    }
+
+    public void onExecuteUpdateFinished() {
+        if (isBatch()) {
+            flushBulkActions();
+        }
+
+        final String indexName = getDataContext().getIndexName();
+        final Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
+
+        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
+    }
+
+    private void flushBulkActions() {
+        if (bulkBuilder == null || bulkActionCount == 0) {
+            // nothing to flush
+            return;
+        }
+        final Bulk bulk = getBulkBuilder().build();
+        logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
+        executeBlocking(bulk);
+
+        bulkActionCount = 0;
+        bulkBuilder = null;
+    }
+
+    public void execute(Action<?> action) {
+        if (isBatch() && action instanceof BulkableAction) {
+            final Bulk.Builder bulkBuilder = getBulkBuilder();
+            bulkBuilder.addAction((BulkableAction<?>) action);
+            bulkActionCount++;
+            if (bulkActionCount == BULK_BUFFER_SIZE) {
+                flushBulkActions();
+            }
+        } else {
+            executeBlocking(action);
+        }
+    }
+
+    private void executeBlocking(Action<?> action) {
+        final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);
+        if (!result.isSucceeded()) {
+            if (result instanceof BulkResult) {
+                final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems();
+                for (int i = 0; i < failedItems.size(); i++) {
+                    final BulkResultItem failedItem = failedItems.get(i);
+                    logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error);
+                }
+            }
+            throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage());
+        }
+    }
+
+    private Builder getBulkBuilder() {
+        if (bulkBuilder == null) {
+            bulkBuilder = new Bulk.Builder();
+            bulkBuilder.defaultIndex(getDataContext().getIndexName());
+        }
+        return bulkBuilder;
+    }
+}