You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/01/26 04:13:32 UTC

[4/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

Using the official elastic REST high level client for ElasticSearch.

Closes #177

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/bda8d764
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/bda8d764
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/bda8d764

Branch: refs/heads/master
Commit: bda8d764f65acdf3b1f520d21cb51a7396e23c7d
Parents: c57d508
Author: Arjan Seijkens <ar...@gmail.com>
Authored: Thu Jan 25 20:13:08 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Thu Jan 25 20:13:08 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   5 +-
 CHANGES.md                                      |   3 +-
 .../AbstractElasticSearchDataContext.java       | 140 +++++
 .../AbstractElasticSearchDataSet.java           | 123 ++++
 .../common/ElasticSearchDateConverter.java      |  17 +-
 .../common/ElasticSearchMetaDataParser.java     |  79 +++
 .../common/ElasticSearchUtils.java              |  80 +--
 .../common/ElasticSearchUtilsTest.java          |  63 ++
 elasticsearch/native/pom.xml                    |  36 ++
 .../ElasticSearchCreateTableBuilder.java        |   8 +-
 .../nativeclient/ElasticSearchDataContext.java  | 171 ++----
 .../ElasticSearchDataContextFactory.java        |  47 +-
 .../nativeclient/ElasticSearchDataSet.java      |  90 +--
 .../ElasticSearchDeleteBuilder.java             |  27 +-
 .../ElasticSearchDropTableBuilder.java          | 103 ----
 .../ElasticSearchInsertBuilder.java             |   8 +-
 .../ElasticSearchMetaDataParser.java            |  81 ---
 .../ElasticSearchUpdateBuilder.java             | 116 ++++
 .../ElasticSearchUpdateCallback.java            |  10 +-
 .../nativeclient/NativeElasticSearchUtils.java  |  67 --
 .../ElasticSearchDataContextTest.java           | 231 +++----
 .../ElasticSearchMetaDataParserTest.java        |   1 +
 .../nativeclient/ElasticSearchUtilsTest.java    |  63 --
 .../utils/EmbeddedElasticsearchServer.java      |  71 ---
 elasticsearch/pom.xml                           |   2 +-
 elasticsearch/rest/pom.xml                      | 114 +++-
 .../rest/ElasticSearchRestClient.java           | 134 ++++
 .../ElasticSearchRestCreateTableBuilder.java    |  55 ++
 .../rest/ElasticSearchRestDataContext.java      | 226 ++-----
 .../ElasticSearchRestDataContextFactory.java    |  46 +-
 .../rest/ElasticSearchRestDataSet.java          |  65 ++
 .../rest/ElasticSearchRestDeleteBuilder.java    |  96 +++
 .../rest/ElasticSearchRestInsertBuilder.java    |  72 +++
 .../rest/ElasticSearchRestUpdateCallback.java   | 167 +++++
 .../elasticsearch/rest/JestClientExecutor.java  |  51 --
 .../elasticsearch/rest/JestDeleteScroll.java    |  57 --
 .../JestElasticSearchCreateTableBuilder.java    |  56 --
 .../rest/JestElasticSearchDataSet.java          | 124 ----
 .../rest/JestElasticSearchDeleteBuilder.java    |  76 ---
 .../rest/JestElasticSearchDropTableBuilder.java |  62 --
 .../rest/JestElasticSearchInsertBuilder.java    |  74 ---
 .../rest/JestElasticSearchMetaDataParser.java   |  75 ---
 .../rest/JestElasticSearchUpdateCallback.java   | 164 -----
 .../rest/JestElasticSearchUtils.java            |  90 ---
 .../ElasticSearchRestDataContexFactoryIT.java   | 131 ++++
 .../rest/ElasticSearchRestDataContextIT.java    | 535 ++++++++++++++++
 .../rest/JestElasticSearchDataContextTest.java  | 615 -------------------
 .../JestElasticSearchMetaDataParserTest.java    |  70 ---
 .../rest/JestElasticSearchUtilsTest.java        | 188 ------
 .../rest/utils/EmbeddedElasticsearchServer.java |  72 ---
 .../rest/src/test/resources/Dockerfile          |   5 +
 .../rest/src/test/resources/elasticsearch.yml   |  13 +
 .../apache/metamodel/DataContextFactory.java    |   6 +-
 hbase/pom.xml                                   |   4 +
 pom.xml                                         |   1 +
 55 files changed, 2276 insertions(+), 2810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 1b1e342..65df979 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,7 +15,10 @@ before_install:
 services:
   - couchdb
   - mongodb
-  
+  - docker
+
+script: "mvn clean verify -P integration-test"  
+
 after_success:
   - mvn test javadoc:javadoc
  

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index 5af3fc4..7ab1d22 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,6 @@
-### WIP
+### Apache MetaModel 5.1.0 (WIP)
 
+ * [METAMODEL-1179] - Refactored ElasticSearch REST module to use new official REST based client from Elastic.
  * [METAMODEL-1177] - Made TableType.TABLE the default table type, replacing null values.
 
 ### Apache MetaModel 5.0.1

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
new file mode 100644
index 0000000..c8ffae3
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.common.unit.TimeValue;
+
+public abstract class AbstractElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext,
+        UpdateableDataContext {
+
+    public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
+
+    protected final String indexName;
+
+    // Table definitions that are set from the beginning, not supposed to be
+    // changed.
+    protected final List<SimpleTableDef> staticTableDefinitions;
+
+    // Table definitions that are discovered, these can change
+    protected final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+
+    /**
+     * Constructs a {@link ElasticSearchRestDataContext}. This constructor
+     * accepts a custom array of {@link SimpleTableDef}s which allows the user
+     * to define his own view on the indexes in the engine.
+     *
+     * @param indexName
+     *            the name of the ElasticSearch index to represent
+     * @param tableDefinitions
+     *            an array of {@link SimpleTableDef}s, which define the table
+     *            and column model of the ElasticSearch index.
+     */
+    public AbstractElasticSearchDataContext(String indexName, SimpleTableDef... tableDefinitions) {
+        super(false);
+        if (indexName == null || indexName.trim().length() == 0) {
+            throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
+        }
+        this.indexName = indexName;
+        this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
+                .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
+    }
+
+    /**
+     * Performs an analysis of the available indexes in an ElasticSearch cluster
+     * {@link JestClient} instance and detects the elasticsearch types structure
+     * based on the metadata provided by the ElasticSearch java client.
+     *
+     * @see {@link #detectTable(JsonObject, String)}
+     * @return a mutable schema instance, useful for further fine tuning by the
+     *         user.
+     */
+    protected abstract SimpleTableDef[] detectSchema();
+
+    @Override
+    protected Schema getMainSchema() throws MetaModelException {
+        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+        for (final SimpleTableDef tableDef : staticTableDefinitions) {
+            addTable(theSchema, tableDef);
+        }
+
+        final SimpleTableDef[] tables = detectSchema();
+        synchronized (this) {
+            dynamicTableDefinitions.clear();
+            dynamicTableDefinitions.addAll(Arrays.asList(tables));
+            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
+                final List<String> tableNames = theSchema.getTableNames();
+
+                if (!tableNames.contains(tableDef.getName())) {
+                    addTable(theSchema, tableDef);
+                }
+            }
+        }
+
+        return theSchema;
+    }
+
+    private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
+        final MutableTable table = tableDef.toTable().setSchema(theSchema);
+        final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
+        if (idColumn != null && idColumn instanceof MutableColumn) {
+            final MutableColumn mutableColumn = (MutableColumn) idColumn;
+            mutableColumn.setPrimaryKey(true);
+        }
+        theSchema.addTable(table);
+    }
+
+    @Override
+    protected String getMainSchemaName() throws MetaModelException {
+        return indexName;
+    }
+
+    /**
+     * Gets the name of the index that this {@link DataContext} is working on.
+     */
+    public String getIndexName() {
+        return indexName;
+    }
+
+    protected boolean limitMaxRowsIsSet(int maxRows) {
+        return (maxRows != -1);
+    }
+    
+    protected static SimpleTableDef[] sortTables(final List<SimpleTableDef> result) {
+        final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
+        Arrays.sort(tableDefArray, (o1, o2) -> o1.getName().compareTo(o2.getName()));
+        return tableDefArray;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
new file mode 100644
index 0000000..fea2190
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+public abstract class AbstractElasticSearchDataSet extends AbstractDataSet {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractElasticSearchDataSet.class);
+
+    protected final AtomicBoolean _closed;
+
+    protected SearchResponse _searchResponse;
+    protected SearchHit _currentHit;
+    protected int _hitIndex = 0;
+
+    public AbstractElasticSearchDataSet(final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+        super(selectItems);
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        boolean closeNow = _closed.compareAndSet(true, false);
+        if (closeNow) {
+            closeNow();
+        }
+    }
+
+    protected abstract void closeNow();
+    
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        if (!_closed.get()) {
+            logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
+            close();
+        }
+    }
+
+    @Override
+    public boolean next() {
+        final SearchHit[] hits = _searchResponse.getHits().getHits();
+        if (hits.length == 0) {
+            // break condition for the scroll
+            _currentHit = null;
+            return false;
+        }
+
+        if (_hitIndex < hits.length) {
+            // pick the next hit within this search response
+            _currentHit = hits[_hitIndex];
+            _hitIndex++;
+            return true;
+        }
+
+        final String scrollId = _searchResponse.getScrollId();
+        if (scrollId == null) {
+            // this search response is not scrolleable - then it's the end.
+            _currentHit = null;
+            return false;
+        }
+
+        // try to scroll to the next set of hits
+        try {
+            _searchResponse = scrollSearchResponse(scrollId);
+        } catch (IOException e) {
+            logger.warn("Failed to scroll to the next search response set.", e);
+            return false;
+        }
+
+        // start over (recursively)
+        _hitIndex = 0;
+        return next();
+    }
+    
+    protected abstract SearchResponse scrollSearchResponse(final String scrollId) throws IOException;
+
+    @Override
+    public Row getRow() {
+        if (_currentHit == null) {
+            return null;
+        }
+
+        final Map<String, Object> source = _currentHit.getSource();
+        final String documentId = _currentHit.getId();
+        return ElasticSearchUtils.createRow(source, documentId, getHeader());
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
index a6ce656..652fbe6 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
@@ -20,6 +20,7 @@ package org.apache.metamodel.elasticsearch.common;
 
 import org.apache.metamodel.util.TimeComparator;
 
+import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -30,12 +31,22 @@ import java.util.Date;
  */
 public final class ElasticSearchDateConverter {
 
+    private static final DateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+    private static final DateFormat FALLBACK_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
     public static Date tryToConvert(String dateAsString) {
+        if (dateAsString == null) {  
+            return null;
+        }
+
         try {
-            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
-            return dateFormat.parse(dateAsString);
+            return DEFAULT_DATE_FORMAT.parse(dateAsString);
         } catch (ParseException e) {
-            return TimeComparator.toDate(dateAsString);
+            try {
+                return FALLBACK_DATE_FORMAT.parse(dateAsString);
+            } catch (ParseException e1) {
+                return TimeComparator.toDate(dateAsString);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
new file mode 100644
index 0000000..32f07ff
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * Parser that transforms the ElasticSearch metadata response (json-like format)
+ * into an ElasticSearchMetaData object.
+ */
+public class ElasticSearchMetaDataParser {
+
+    /**
+     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
+     * object. This method makes much easier to create the ElasticSearch schema.
+     *
+     * @param metaDataInfo
+     *            ElasticSearch mapping metadata in Map format
+     * @return An ElasticSearchMetaData object
+     */
+    public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
+        final String[] fieldNames = new String[metaDataInfo.size() + 1];
+        final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1];
+
+        // add the document ID field (fixed)
+        fieldNames[0] = ElasticSearchUtils.FIELD_ID;
+        columnTypes[0] = ColumnType.STRING;
+
+        int i = 1;
+        for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
+            @SuppressWarnings("unchecked")
+            final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue();
+
+            fieldNames[i] = metaDataField.getKey();
+            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
+            i++;
+
+        }
+        return new ElasticSearchMetaData(fieldNames, columnTypes);
+    }
+
+    private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) {
+        final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
+
+        if (metaDataFieldType == null) {
+            return ColumnType.STRING;
+        }
+
+        return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
+    }
+
+    private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) {
+        final Object type = metaDataField.get("type");
+        if (type == null) {
+            return null;
+        }
+        return type.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index b298d11..9128182 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -18,72 +18,43 @@
  */
 package org.apache.metamodel.elasticsearch.common;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.ColumnType;
 import org.apache.metamodel.schema.MutableColumn;
 import org.apache.metamodel.schema.MutableTable;
 import org.apache.metamodel.util.CollectionUtils;
-import org.elasticsearch.common.base.Strings;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.ExistsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ElasticSearchUtils {
 
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
-
     public static final String FIELD_ID = "_id";
     public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
 
-    /**
-     * Gets a "filter" query which is both 1.x and 2.x compatible.
-     */
-    private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
-        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
-        // FilterBuilders.missingFilter(fieldName));
-        // 2.x: itemQueryBuilder =
-        // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
-        try {
-            try {
-                Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
-                method.setAccessible(true);
-                return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
-            } catch (NoSuchMethodException e) {
-                Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
-                        "org.elasticsearch.index.query.FilterBuilders");
-                Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
-                filterBuilderMethod.setAccessible(true);
-                Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
-                        QueryBuilder.class, FilterBuilder.class);
-                return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
-                        null, fieldName));
-            }
-        } catch (Exception e) {
-            logger.error("Failed to resolve/invoke filtering method", e);
-            throw new IllegalStateException("Failed to resolve filtering method", e);
-        }
-    }
-
     public static QueryBuilder getMissingQuery(String fieldName) {
-        return getFilteredQuery("missing", fieldName);
+        return new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(fieldName));
     }
 
     public static QueryBuilder getExistsQuery(String fieldName) {
-        return getFilteredQuery("exists", fieldName);
+        return new ExistsQueryBuilder(fieldName);
     }
 
     public static Map<String, ?> getMappingSource(final MutableTable table) {
@@ -170,7 +141,7 @@ public class ElasticSearchUtils {
         }
 
         if (type.isLiteral()) {
-            return "string";
+            return "text";
         } else if (type == ColumnType.FLOAT) {
             return "float";
         } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) {
@@ -294,4 +265,35 @@ public class ElasticSearchUtils {
         }
         return columnType;
     }
+
+    public static Row createRow(final Map<String, Object> sourceMap, final String documentId, final DataSetHeader header) {
+        final Object[] values = new Object[header.size()];
+        for (int i = 0; i < values.length; i++) {
+            final SelectItem selectItem = header.getSelectItem(i);
+            final Column column = selectItem.getColumn();
+
+            assert column != null;
+            assert selectItem.getAggregateFunction() == null;
+            assert selectItem.getScalarFunction() == null;
+
+            if (column.isPrimaryKey()) {
+                values[i] = documentId;
+            } else {
+                Object value = sourceMap.get(column.getName());
+
+                if (column.getType() == ColumnType.DATE) {
+                    Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
+                    if (valueToDate == null) {
+                        values[i] = value;
+                    } else {
+                        values[i] = valueToDate;
+                    }
+                } else {
+                    values[i] = value;
+                }
+            }
+        }
+
+        return new DefaultRow(header, values);
+    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
new file mode 100644
index 0000000..9fb7e03
--- /dev/null
+++ b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import junit.framework.TestCase;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+
+import java.util.*;
+
+public class ElasticSearchUtilsTest extends TestCase {
+
+    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
+        MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
+        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
+        List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        Map<String, Object> values = new HashMap<>();
+        values.put("value1", "theValue");
+        Row row = ElasticSearchUtils.createRow(values, documentId, header);
+        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
+
+        assertEquals(primaryKeyValue, documentId);
+    }
+
+    public void testCreateRowWithParsableDates() throws Exception {
+        SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
+        SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
+        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        Map<String, Object> values = new HashMap<>();
+        values.put("value1", "theValue");
+        values.put("value2", "2013-01-04T15:55:51.217+01:00");
+        Row row = ElasticSearchUtils.createRow(values, documentId, header);
+        Object stringValue = row.getValue(item1);
+        Object dateValue = row.getValue(item2);
+
+        assertTrue(stringValue instanceof String);
+        assertTrue(dateValue instanceof Date);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml
index 4c1abcf..6adca36 100644
--- a/elasticsearch/native/pom.xml
+++ b/elasticsearch/native/pom.xml
@@ -43,6 +43,17 @@
 			<artifactId>elasticsearch</artifactId>
 			<version>${elasticsearch.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>${elasticsearch.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
 
 		<!-- test -->
 		<dependency>
@@ -53,7 +64,32 @@
 		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
+			<version>4.12</version>
 			<scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.hamcrest</groupId>
+					<artifactId>hamcrest-core</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+		    <groupId>org.elasticsearch.test</groupId>
+		    <artifactId>framework</artifactId>
+			<version>${elasticsearch.version}</version>
+		    <scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+		    <groupId>org.apache.logging.log4j</groupId>
+		    <artifactId>log4j-core</artifactId>
+		    <version>2.9.1</version>
+		    <scope>test</scope>
 		</dependency>
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
index f27e8ac..4e5873c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -27,6 +27,7 @@ import org.apache.metamodel.schema.MutableSchema;
 import org.apache.metamodel.schema.MutableTable;
 import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
 import org.elasticsearch.client.IndicesAdminClient;
@@ -50,13 +51,16 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder
         final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
         final String indexName = dataContext.getIndexName();
 
-        final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
-                .setType(table.getName());
+        final PutMappingRequestBuilder requestBuilder =
+                new PutMappingRequestBuilder(indicesAdmin, PutMappingAction.INSTANCE).setIndices(indexName)
+                        .setType(table.getName());
         requestBuilder.setSource(source);
         final PutMappingResponse result = requestBuilder.execute().actionGet();
 
         logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());
 
+        dataContext.getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+
         final MutableSchema schema = (MutableSchema) getSchema();
         schema.addTable(table);
         return table;

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
index d2dfe4b..3df0ce1 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
@@ -18,39 +18,30 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.metamodel.DataContext;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
 import org.apache.metamodel.UpdateScript;
 import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
 import org.apache.metamodel.data.DataSet;
 import org.apache.metamodel.data.DataSetHeader;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.util.SimpleTableDef;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -59,14 +50,16 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectLookupContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.carrotsearch.hppc.ObjectLookupContainer;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+
 /**
  * DataContext implementation for ElasticSearch analytics engine.
  *
@@ -84,20 +77,11 @@ import org.slf4j.LoggerFactory;
  * This implementation supports either automatic discovery of a schema or manual
  * specification of a schema, through the {@link SimpleTableDef} class.
  */
-public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
+public class ElasticSearchDataContext extends AbstractElasticSearchDataContext {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
 
-    public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
-
     private final Client elasticSearchClient;
-    private final String indexName;
-    // Table definitions that are set from the beginning, not supposed to be
-    // changed.
-    private final List<SimpleTableDef> staticTableDefinitions;
-
-    // Table definitions that are discovered, these can change
-    private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
 
     /**
      * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
@@ -113,16 +97,12 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
      *            and column model of the ElasticSearch index.
      */
     public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
-        super(false);
+        super(indexName, tableDefinitions);
+
         if (client == null) {
             throw new IllegalArgumentException("ElasticSearch Client cannot be null");
         }
-        if (indexName == null || indexName.trim().length() == 0) {
-            throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
-        }
         this.elasticSearchClient = client;
-        this.indexName = indexName;
-        this.staticTableDefinitions = Arrays.asList(tableDefinitions);
         this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
     }
 
@@ -140,40 +120,14 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         this(client, indexName, new SimpleTableDef[0]);
     }
 
-    /**
-     * Performs an analysis of the available indexes in an ElasticSearch cluster
-     * {@link Client} instance and detects the elasticsearch types structure
-     * based on the metadata provided by the ElasticSearch java client.
-     *
-     * @see {@link #detectTable(ClusterState, String, String)}
-     * @return a mutable schema instance, useful for further fine tuning by the
-     *         user.
-     */
-    private SimpleTableDef[] detectSchema() {
+    @Override
+    protected SimpleTableDef[] detectSchema() {
         logger.info("Detecting schema for index '{}'", indexName);
 
-        final ClusterState cs;
         final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
-                .prepareState();
-
-        // different methods here to set the index name, so we have to use
-        // reflection :-/
-        try {
-            final byte majorVersion = Version.CURRENT.major;
-            final Object methodArgument = new String[] { indexName };
-            if (majorVersion == 0) {
-                final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
-                method.invoke(clusterStateRequestBuilder, methodArgument);
-            } else {
-                final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
-                method.invoke(clusterStateRequestBuilder, methodArgument);
-            }
-        } catch (Exception e) {
-            logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
-            throw new MetaModelException("Failed to create request for index information needed to detect schema", e);
-        }
-        cs = clusterStateRequestBuilder.execute().actionGet().getState();
-
+                .prepareState().setIndices(indexName);
+        final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState();
+        
         final List<SimpleTableDef> result = new ArrayList<>();
 
         final IndexMetaData imd = cs.getMetaData().index(indexName);
@@ -183,9 +137,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         } else {
             final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
             final ObjectLookupContainer<String> documentTypes = mappings.keys();
-
-            for (final Object documentTypeCursor : documentTypes) {
-                final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString();
+            for (final ObjectCursor<?> documentTypeCursor : documentTypes) {
+                final String documentType = documentTypeCursor.value.toString();
                 try {
                     final SimpleTableDef table = detectTable(cs, indexName, documentType);
                     result.add(table);
@@ -194,15 +147,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
                 }
             }
         }
-        final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
-        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
-            @Override
-            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
-
-        return tableDefArray;
+        return sortTables(result);
     }
 
     /**
@@ -225,7 +170,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
             // index does not exist
             throw new IllegalArgumentException("No such index: " + indexName);
         }
-        final MappingMetaData mappingMetaData = imd.mapping(documentType);
+        final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
+        final MappingMetaData mappingMetaData = mappings.get(documentType);
         if (mappingMetaData == null) {
             throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
         }
@@ -244,44 +190,6 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
     }
 
     @Override
-    protected Schema getMainSchema() throws MetaModelException {
-        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
-        for (final SimpleTableDef tableDef : staticTableDefinitions) {
-            addTable(theSchema, tableDef);
-        }
-
-        final SimpleTableDef[] tables = detectSchema();
-        synchronized (this) {
-            dynamicTableDefinitions.clear();
-            dynamicTableDefinitions.addAll(Arrays.asList(tables));
-            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
-                final List<String> tableNames = theSchema.getTableNames();
-
-                if (!tableNames.contains(tableDef.getName())) {
-                    addTable(theSchema, tableDef);
-                }
-            }
-        }
-
-        return theSchema;
-    }
-
-    private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
-        final MutableTable table = tableDef.toTable().setSchema(theSchema);
-        final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
-        if (idColumn != null && idColumn instanceof MutableColumn) {
-            final MutableColumn mutableColumn = (MutableColumn) idColumn;
-            mutableColumn.setPrimaryKey(true);
-        }
-        theSchema.addTable(table);
-    }
-
-    @Override
-    protected String getMainSchemaName() throws MetaModelException {
-        return indexName;
-    }
-
-    @Override
     protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
             List<FilterItem> whereItems, int firstRow, int maxRows) {
         final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
@@ -290,7 +198,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
             // where clause can be pushed down to an ElasticSearch query
             final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
             final SearchResponse response = searchRequest.execute().actionGet();
-            return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false);
+            return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems);
         }
         return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
     }
@@ -299,12 +207,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
     protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
         final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
         final SearchResponse response = searchRequest.execute().actionGet();
-        return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false);
+        return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new)
+                .collect(Collectors.toList()));
     }
 
     private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
         final String documentType = table.getName();
-        final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
+        final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType);
         if (firstRow > 1) {
             final int zeroBasedFrom = firstRow - 1;
             searchRequest.setFrom(zeroBasedFrom);
@@ -332,7 +241,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         final String documentType = table.getName();
         final String id = keyValue.toString();
 
-        final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet();
+        final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet();
 
         if (!response.isExists()) {
             return null;
@@ -343,7 +252,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
 
         final DataSetHeader header = new SimpleDataSetHeader(selectItems);
 
-        return NativeElasticSearchUtils.createRow(source, documentId, header);
+        return ElasticSearchUtils.createRow(source, documentId, header);
     }
 
     @Override
@@ -353,13 +262,11 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
             return null;
         }
         final String documentType = table.getName();
-        final CountResponse response = elasticSearchClient.prepareCount(indexName)
-                .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet();
-        return response.getCount();
-    }
-
-    private boolean limitMaxRowsIsSet(int maxRows) {
-        return (maxRows != -1);
+        final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType);
+        final SearchResponse searchResponse =
+                getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query))
+                        .execute().actionGet();
+        return searchResponse.getHits().getTotalHits();
     }
 
     @Override
@@ -370,6 +277,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         return callback.getUpdateSummary();
     }
 
+    @Override
+    protected void onSchemaCacheRefreshed() {
+        getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+        
+        detectSchema();
+    }
+
     /**
      * Gets the {@link Client} that this {@link DataContext} is wrapping.
      *
@@ -378,13 +292,4 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
     public Client getElasticSearchClient() {
         return elasticSearchClient;
     }
-
-    /**
-     * Gets the name of the index that this {@link DataContext} is working on.
-     *
-     * @return
-     */
-    public String getIndexName() {
-        return indexName;
-    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
index 94359c4..a6f6953 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.apache.metamodel.ConnectionException;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.factory.DataContextFactory;
@@ -27,12 +30,11 @@ import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
 import org.apache.metamodel.util.SimpleTableDef;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.ImmutableSettings.Builder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Factory for ElasticSearch data context of native type.
@@ -59,6 +61,7 @@ import org.elasticsearch.node.NodeBuilder;
  * </ul>
  */
 public class ElasticSearchDataContextFactory implements DataContextFactory {
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContextFactory.class);
 
     @Override
     public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) {
@@ -120,46 +123,22 @@ public class ElasticSearchDataContextFactory implements DataContextFactory {
     @Override
     public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
             throws UnsupportedDataContextPropertiesException, ConnectionException {
-        final String clientType = getClientType(properties);
         final Client client;
-        if ("node".equals(clientType)) {
-            client = createNodeClient(properties);
-        } else {
             client = createTransportClient(properties);
-        }
         final String indexName = getIndex(properties);
         final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
         return new ElasticSearchDataContext(client, indexName, tableDefinitions);
     }
 
     private Client createTransportClient(DataContextProperties properties) {
-        final Builder settingsBuilder = ImmutableSettings.builder();
-        settingsBuilder.put("name", "MetaModel");
-        settingsBuilder.put("cluster.name", getCluster(properties));
-        if (properties.getUsername() != null && properties.getPassword() != null) {
-            settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword());
-            if ("true".equals(properties.toMap().get("ssl"))) {
-                if (properties.toMap().get("keystorePath") != null) {
-                    settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath"));
-                    settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword"));
-                }
-                settingsBuilder.put("shield.transport.ssl", "true");
-            }
+        final Settings settings = Settings.builder().put().put("name", "MetaModel").put("cluster.name", getCluster(properties)).build();
+        final TransportClient client = new PreBuiltTransportClient(settings);
+        try {
+            client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(properties.getHostname()), properties.getPort()));
+        } catch (UnknownHostException e) {
+            logger.warn("no IP address for the host with name \"{}\" could be found.", properties.getHostname());
         }
-        final Settings settings = settingsBuilder.build();
-
-        final TransportClient client = new TransportClient(settings);
-        client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort()));
         return client;
     }
 
-    private Client createNodeClient(DataContextProperties properties) {
-        final Builder settingsBuilder = ImmutableSettings.builder();
-        settingsBuilder.put("name", "MetaModel");
-        settingsBuilder.put("shield.enabled", false);
-        final Settings settings = settingsBuilder.build();
-        final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings)
-                .node();
-        return node.client();
-    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
index 4ced2c8..b616eb2 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
@@ -19,104 +19,38 @@
 package org.apache.metamodel.elasticsearch.nativeclient;
 
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.metamodel.data.AbstractDataSet;
 import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
 import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollAction;
 import org.elasticsearch.action.search.ClearScrollRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link DataSet} implementation for ElasticSearch
  */
-final class ElasticSearchDataSet extends AbstractDataSet {
-
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class);
+final class ElasticSearchDataSet extends AbstractElasticSearchDataSet {
 
     private final Client _client;
-    private final AtomicBoolean _closed;
-
-    private SearchResponse _searchResponse;
-    private SearchHit _currentHit;
-    private int _hitIndex = 0;
 
-    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems,
-            boolean queryPostProcessed) {
-        super(selectItems);
+    public ElasticSearchDataSet(final Client client, final SearchResponse searchResponse,
+            final List<SelectItem> selectItems) {
+        super(searchResponse, selectItems);
         _client = client;
-        _searchResponse = searchResponse;
-        _closed = new AtomicBoolean(false);
     }
 
-
     @Override
-    public void close() {
-        super.close();
-        boolean closeNow = _closed.compareAndSet(true, false);
-        if (closeNow) {
-            ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client)
-                    .addScrollId(_searchResponse.getScrollId());
-            scrollRequestBuilder.execute();
-        }
+    public void closeNow() {
+        ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client,
+                ClearScrollAction.INSTANCE).addScrollId(_searchResponse.getScrollId());
+        scrollRequestBuilder.execute();
     }
 
     @Override
-    protected void finalize() throws Throwable {
-        super.finalize();
-        if (!_closed.get()) {
-            logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
-            close();
-        }
-    }
-
-    @Override
-    public boolean next() {
-        final SearchHit[] hits = _searchResponse.getHits().hits();
-        if (hits.length == 0) {
-            // break condition for the scroll
-            _currentHit = null;
-            return false;
-        }
-
-        if (_hitIndex < hits.length) {
-            // pick the next hit within this search response
-            _currentHit = hits[_hitIndex];
-            _hitIndex++;
-            return true;
-        }
-
-        final String scrollId = _searchResponse.getScrollId();
-        if (scrollId == null) {
-            // this search response is not scrolleable - then it's the end.
-            _currentHit = null;
-            return false;
-        }
-
-        // try to scroll to the next set of hits
-        _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+    protected SearchResponse scrollSearchResponse(final String scrollId) {
+        return _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
                 .execute().actionGet();
-
-        // start over (recursively)
-        _hitIndex = 0;
-        return next();
-    }
-
-    @Override
-    public Row getRow() {
-        if (_currentHit == null) {
-            return null;
-        }
-
-        final Map<String, Object> source = _currentHit.getSource();
-        final String documentId = _currentHit.getId();
-        final Row row = NativeElasticSearchUtils.createRow(source, documentId, getHeader());
-        return row;
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
index 0de2a71..2db8e8c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.metamodel.MetaModelException;
@@ -27,9 +28,11 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,10 +60,6 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
         final Client client = dataContext.getElasticSearchClient();
         final String indexName = dataContext.getIndexName();
 
-        final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client);
-        deleteByQueryRequestBuilder.setIndices(indexName);
-        deleteByQueryRequestBuilder.setTypes(documentType);
-
         final List<FilterItem> whereItems = getWhereItems();
 
         // delete by query - note that creteQueryBuilderForSimpleWhere may
@@ -74,9 +73,21 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
             throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
                     + whereItems);
         }
-        deleteByQueryRequestBuilder.setQuery(queryBuilder);
-        deleteByQueryRequestBuilder.execute().actionGet();
 
-        logger.debug("Deleted documents by query.");
+        final SearchResponse response =
+                client.prepareSearch(indexName).setQuery(queryBuilder).setTypes(documentType).execute()
+                        .actionGet();
+
+        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+        final Iterator<SearchHit> iterator = response.getHits().iterator();
+        while (iterator.hasNext()) {
+            final SearchHit hit = iterator.next();
+            final String typeId = hit.getId();
+            final DeleteResponse deleteResponse =
+                    client.prepareDelete().setIndex(indexName).setType(documentType).setId(typeId).execute()
+                            .actionGet();
+            logger.debug("Deleted documents by query." + deleteResponse.getResult());
+        }
+        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
deleted file mode 100644
index d66b240..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.lang.reflect.Method;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class);
-
-    private final ElasticSearchUpdateCallback _updateCallback;
-
-    public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
-        final Table table = getTable();
-        final String documentType = table.getName();
-        logger.info("Deleting mapping / document type: {}", documentType);
-        final Client client = dataContext.getElasticSearchClient();
-        final IndicesAdminClient indicesAdminClient = client.admin().indices();
-        final String indexName = dataContext.getIndexName();
-
-        final DeleteMappingRequestBuilder requestBuilder = new DeleteMappingRequestBuilder(indicesAdminClient)
-                .setIndices(indexName);
-        setType(requestBuilder, documentType);
-
-        final DeleteMappingResponse result = requestBuilder.execute().actionGet();
-        logger.debug("Delete mapping response: acknowledged={}", result.isAcknowledged());
-
-        final MutableSchema schema = (MutableSchema) table.getSchema();
-        schema.removeTable(table);
-    }
-
-    /**
-     * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} method
-     * using reflection. This is done because the API of ElasticSearch was
-     * changed and the method signature differes between different versions.
-     * 
-     * @param requestBuilder
-     * @param documentType
-     */
-    private void setType(DeleteMappingRequestBuilder requestBuilder, String documentType) {
-        Object argument;
-        Method method;
-        try {
-            try {
-                method = requestBuilder.getClass().getDeclaredMethod("setType", String[].class);
-                argument = new String[] {documentType};
-            } catch (NoSuchMethodException e) {
-                logger.debug("No setType(String[]) method found, trying with a single String instead", e);
-                method = requestBuilder.getClass().getDeclaredMethod("setType", String.class);
-                argument = documentType;
-            }
-        } catch (Exception e) {
-            logger.error("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
-            throw new IllegalStateException("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
-        }
-        try {
-            method.setAccessible(true);
-            method.invoke(requestBuilder, argument);
-        } catch (Exception e) {
-            logger.error("Failed to invoke {}", method, e);
-            throw new IllegalStateException("Failed to invoke " + method, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
index 70d31b4..a1c7f69 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
@@ -26,6 +26,7 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
@@ -46,7 +47,8 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast
         final Client client = dataContext.getElasticSearchClient();
         final String indexName = dataContext.getIndexName();
         final String documentType = getTable().getName();
-        final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType);
+        final IndexRequestBuilder requestBuilder =
+                new IndexRequestBuilder(client, IndexAction.INSTANCE).setIndex(indexName).setType(documentType);
 
         final Map<String, Object> valueMap = new HashMap<>();
         final Column[] columns = getColumns();
@@ -68,11 +70,11 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast
         assert !valueMap.isEmpty();
 
         requestBuilder.setSource(valueMap);
-        requestBuilder.setCreate(true);
 
         final IndexResponse result = requestBuilder.execute().actionGet();
         
         logger.debug("Inserted document: id={}", result.getId());
-    }
 
+        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
deleted file mode 100644
index c0a1232..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Parser that transforms the ElasticSearch metadata response (json-like format)
- * into an ElasticSearchMetaData object.
- */
-public class ElasticSearchMetaDataParser {
-
-    /**
-     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
-     * object. This method makes much easier to create the ElasticSearch schema.
-     *
-     * @param metaDataInfo
-     *            ElasticSearch mapping metadata in Map format
-     * @return An ElasticSearchMetaData object
-     */
-    public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
-        final String[] fieldNames = new String[metaDataInfo.size() + 1];
-        final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1];
-
-        // add the document ID field (fixed)
-        fieldNames[0] = ElasticSearchUtils.FIELD_ID;
-        columnTypes[0] = ColumnType.STRING;
-
-        int i = 1;
-        for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
-            @SuppressWarnings("unchecked")
-            final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue();
-
-            fieldNames[i] = metaDataField.getKey();
-            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
-            i++;
-
-        }
-        return new ElasticSearchMetaData(fieldNames, columnTypes);
-    }
-
-    private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) {
-        final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
-
-        if (metaDataFieldType == null) {
-            return ColumnType.STRING;
-        }
-
-        return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
-    }
-
-    private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) {
-        final Object type = metaDataField.get("type");
-        if (type == null) {
-            return null;
-        }
-        return type.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
new file mode 100644
index 0000000..eeee6fc
--- /dev/null
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.AbstractRowUpdationBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateAction;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchUpdateBuilder extends AbstractRowUpdationBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUpdateBuilder.class);
+
+    private final ElasticSearchUpdateCallback _updateCallback;
+
+    public ElasticSearchUpdateBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+
+        final Table table = getTable();
+        final String documentType = table.getName();
+
+        final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
+        final Client client = dataContext.getElasticSearchClient();
+        final String indexName = dataContext.getIndexName();
+        final List<FilterItem> whereItems = getWhereItems();
+
+        // delete by query - note that creteQueryBuilderForSimpleWhere may
+        // return matchAllQuery() if no where items are present.
+        final QueryBuilder queryBuilder =
+                ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND);
+        if (queryBuilder == null) {
+            // TODO: The where items could not be pushed down to a query. We
+            // could solve this by running a query first, gather all
+            // document IDs and then delete by IDs.
+            throw new UnsupportedOperationException(
+                    "Could not push down WHERE items to delete by query request: " + whereItems);
+        }
+
+        final SearchResponse response = client.prepareSearch(indexName).setQuery(queryBuilder).execute().actionGet();
+
+        final Iterator<SearchHit> iterator = response.getHits().iterator();
+        while (iterator.hasNext()) {
+            final SearchHit hit = iterator.next();
+            final String typeId = hit.getId();
+
+            final UpdateRequestBuilder requestBuilder =
+                    new UpdateRequestBuilder(client, UpdateAction.INSTANCE).setIndex(indexName).setType(documentType)
+                            .setId(typeId);
+
+            final Map<String, Object> valueMap = new HashMap<>();
+            final Column[] columns = getColumns();
+            final Object[] values = getValues();
+            for (int i = 0; i < columns.length; i++) {
+                if (isSet(columns[i])) {
+                    final String name = columns[i].getName();
+                    final Object value = values[i];
+                    if (ElasticSearchUtils.FIELD_ID.equals(name)) {
+                        if (value != null) {
+                            requestBuilder.setId(value.toString());
+                        }
+                    } else {
+                        valueMap.put(name, value);
+                    }
+                }
+            }
+
+            assert !valueMap.isEmpty();
+
+            requestBuilder.setDoc(valueMap);
+
+            final UpdateResponse updateResponse = requestBuilder.execute().actionGet();
+
+            logger.debug("Update document: id={}", updateResponse.getId());
+
+            client.admin().indices().prepareRefresh(indexName).get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
index b81c9c7..c4cbbac 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
@@ -26,6 +26,7 @@ import org.apache.metamodel.drop.TableDropBuilder;
 import org.apache.metamodel.insert.RowInsertionBuilder;
 import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.RowUpdationBuilder;
 import org.elasticsearch.client.Client;
 
 /**
@@ -50,13 +51,13 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
 
     @Override
     public boolean isDropTableSupported() {
-        return true;
+        return false;
     }
 
     @Override
     public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
             UnsupportedOperationException {
-        return new ElasticSearchDropTableBuilder(this, table);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -76,6 +77,11 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
         return new ElasticSearchDeleteBuilder(this, table);
     }
 
+    @Override
+    public RowUpdationBuilder update(final Table table) {
+        return new ElasticSearchUpdateBuilder(this, table);
+    }
+
     public void onExecuteUpdateFinished() {
         // force refresh of the index
         final ElasticSearchDataContext dataContext = getDataContext();

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
deleted file mode 100644
index 822ef1b..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class NativeElasticSearchUtils {
-
-    public static Row createRow(Map<String, Object> sourceMap, String documentId, DataSetHeader header) {
-        final Object[] values = new Object[header.size()];
-        for (int i = 0; i < values.length; i++) {
-            final SelectItem selectItem = header.getSelectItem(i);
-            final Column column = selectItem.getColumn();
-
-            assert column != null;
-            assert selectItem.getAggregateFunction() == null;
-            assert selectItem.getScalarFunction() == null;
-
-            if (column.isPrimaryKey()) {
-                values[i] = documentId;
-            } else {
-                Object value = sourceMap.get(column.getName());
-
-                if (column.getType() == ColumnType.DATE) {
-                    Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
-                    if (valueToDate == null) {
-                        values[i] = value;
-                    } else {
-                        values[i] = valueToDate;
-                    }
-                } else {
-                    values[i] = value;
-                }
-            }
-        }
-
-        return new DefaultRow(header, values);
-    }
-}