You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/01/26 04:13:31 UTC

[3/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
index ec5ecba..983ba5e 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
@@ -19,12 +19,6 @@
 package org.apache.metamodel.elasticsearch.nativeclient;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -45,9 +39,7 @@ import org.apache.metamodel.data.DataSetTableModel;
 import org.apache.metamodel.data.InMemoryDataSet;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.delete.DeleteFrom;
-import org.apache.metamodel.drop.DropTable;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer;
 import org.apache.metamodel.query.FunctionType;
 import org.apache.metamodel.query.Query;
 import org.apache.metamodel.query.SelectItem;
@@ -58,20 +50,18 @@ import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.update.Update;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
-public class ElasticSearchDataContextTest {
+public class ElasticSearchDataContextTest extends ESSingleNodeTestCase {
 
     private static final String indexName = "twitter";
     private static final String indexType1 = "tweet1";
@@ -81,14 +71,15 @@ public class ElasticSearchDataContextTest {
     private static final String bulkIndexType = "bulktype";
     private static final String peopleIndexType = "peopletype";
     private static final String mapping = "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
-    private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
-    private static Client client;
-    private static UpdateableDataContext dataContext;
-
-    @BeforeClass
-    public static void beforeTests() throws Exception {
-        embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
-        client = embeddedElasticsearchServer.getClient();
+    private Client client;
+    private UpdateableDataContext dataContext;
+
+    @Before
+    public void beforeTests() throws Exception {
+        client = client();
+
+        dataContext = new ElasticSearchDataContext(client, indexName);
+
         indexTweeterDocument(indexType1, 1);
         indexTweeterDocument(indexType2, 1);
         indexTweeterDocument(indexType2, 2, null);
@@ -96,15 +87,10 @@ public class ElasticSearchDataContextTest {
         indexTweeterDocument(indexType2, 1);
         indexBulkDocuments(indexName, bulkIndexType, 10);
 
-        // The refresh API allows to explicitly refresh one or more index,
-        // making all operations performed since the last refresh available for
-        // search
-        embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
-        dataContext = new ElasticSearchDataContext(client, indexName);
-        System.out.println("Embedded ElasticSearch server created!");
+        dataContext.refreshSchemas();
     }
 
-    private static void insertPeopleDocuments() throws IOException {
+    private void insertPeopleDocuments() throws IOException {
         indexOnePeopleDocument("female", 20, 5);
         indexOnePeopleDocument("female", 17, 8);
         indexOnePeopleDocument("female", 18, 9);
@@ -116,10 +102,9 @@ public class ElasticSearchDataContextTest {
         indexOnePeopleDocument("male", 18, 4);
     }
 
-    @AfterClass
-    public static void afterTests() {
-        embeddedElasticsearchServer.shutdown();
-        System.out.println("Embedded ElasticSearch server shut down!");
+    @After
+    public void afterTests() {
+        client.admin().indices().delete(new DeleteIndexRequest("_all")).actionGet();
     }
 
     @Test
@@ -128,9 +113,15 @@ public class ElasticSearchDataContextTest {
                 Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
 
         Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+        try (DataSet ds = dataContext.query().from(indexType1).select("_id").execute()) {
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+            assertTrue(ds.next());
+            assertEquals("Row[values=[tweet_tweet1_1]]",ds.getRow().toString());
+        }
 
         assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray()));
 
+        assertEquals(ColumnType.STRING, table.getColumnByName("_id").getType());
         assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
         assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
         assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
@@ -201,7 +192,7 @@ public class ElasticSearchDataContextTest {
     }
 
     @Test
-    public void testCreateTableInsertQueryAndDrop() throws Exception {
+    public void testCreateTableAndInsertQuery() throws Exception {
         final Schema schema = dataContext.getDefaultSchema();
         final CreateTable createTable = new CreateTable(schema, "testCreateTable");
         createTable.withColumn("foo").ofType(ColumnType.STRING);
@@ -235,42 +226,42 @@ public class ElasticSearchDataContextTest {
             assertNotNull(ds.getRow().getValue(idColumn));
             assertFalse(ds.next());
         }
-
-        dataContext.executeUpdate(new DropTable(table));
-
-        dataContext.refreshSchemas();
-
-        assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
     }
 
     @Test
-    public void testDetectOutsideChanges() throws Exception {
-        ElasticSearchDataContext elasticSearchDataContext = (ElasticSearchDataContext) dataContext;
+    public void testDeleteFromWithWhere() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final String tableName = "testCreateTableDelete";
+        final CreateTable createTable = new CreateTable(schema, tableName);
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
+        dataContext.executeUpdate(createTable);
 
-        // Create the type in ES
-        final IndicesAdminClient indicesAdmin = elasticSearchDataContext.getElasticSearchClient().admin().indices();
-        final String tableType = "outsideTable";
+        final Table table = schema.getTableByName(tableName);
 
-        Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" };
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
 
-        new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
-                .execute().actionGet();
+        dataContext.executeUpdate(new DeleteFrom(table).where("bar").eq(42));
 
-        dataContext.refreshSchemas();
+        final Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
+                .toQuery());
 
-        assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
+        assertEquals("Row[values=[1]]", row.toString());
 
-        new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
-        dataContext.refreshSchemas();
-        assertNull(dataContext.getTableByQualifiedLabel(tableType));
     }
 
     @Test
-    public void testDeleteAll() throws Exception {
+    public void testDeleteNoWhere() throws Exception {
         final Schema schema = dataContext.getDefaultSchema();
         final CreateTable createTable = new CreateTable(schema, "testCreateTable");
         createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
         dataContext.executeUpdate(createTable);
 
         final Table table = schema.getTableByName("testCreateTable");
@@ -288,8 +279,6 @@ public class ElasticSearchDataContextTest {
         Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
                 .toQuery());
         assertEquals("Row[values=[0]]", row.toString());
-
-        dataContext.executeUpdate(new DropTable(table));
     }
 
     @Test
@@ -315,8 +304,6 @@ public class ElasticSearchDataContextTest {
         Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
                 dataContext.query().from(table).select("foo", "bar").toQuery());
         assertEquals("Row[values=[world, 43]]", row.toString());
-
-        dataContext.executeUpdate(new DropTable(table));
     }
 
     @Test
@@ -328,27 +315,22 @@ public class ElasticSearchDataContextTest {
         dataContext.executeUpdate(createTable);
 
         final Table table = schema.getTableByName("testCreateTable");
-        try {
 
-            dataContext.executeUpdate(new UpdateScript() {
-                @Override
-                public void run(UpdateCallback callback) {
-                    callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                    callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-                }
-            });
-
-            // greater than is not yet supported
-            try {
-                dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
-                fail("Exception expected");
-            } catch (UnsupportedOperationException e) {
-                assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
-                        e.getMessage());
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
             }
+        });
 
-        } finally {
-            dataContext.executeUpdate(new DropTable(table));
+        // greater than is not yet supported
+        try {
+            dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
+            fail("Exception expected");
+        } catch (UnsupportedOperationException e) {
+            assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
+                    e.getMessage());
         }
     }
 
@@ -361,54 +343,24 @@ public class ElasticSearchDataContextTest {
         dataContext.executeUpdate(createTable);
 
         final Table table = schema.getTableByName("testCreateTable");
-        try {
 
-            dataContext.executeUpdate(new UpdateScript() {
-                @Override
-                public void run(UpdateCallback callback) {
-                    callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                    callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-                }
-            });
-
-            dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
-
-            DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
-            assertTrue(dataSet.next());
-            assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
-            assertTrue(dataSet.next());
-            assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
-            assertFalse(dataSet.next());
-            dataSet.close();
-        } finally {
-            dataContext.executeUpdate(new DropTable(table));
-        }
-    }
-
-    @Test
-    public void testDropTable() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
 
-        // assert that the table was there to begin with
-        {
-            DataSet ds = dataContext.query().from(table).selectCount().execute();
-            ds.next();
-            assertEquals("Row[values=[9]]", ds.getRow().toString());
-            ds.close();
-        }
+        dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
 
-        dataContext.executeUpdate(new DropTable(table));
-        try {
-            DataSet ds = dataContext.query().from(table).selectCount().execute();
-            ds.next();
-            assertEquals("Row[values=[0]]", ds.getRow().toString());
-            ds.close();
-        } finally {
-            // restore the people documents for the next tests
-            insertPeopleDocuments();
-            client.admin().indices().prepareRefresh().execute().actionGet();
-            dataContext = new ElasticSearchDataContext(client, indexName);
-        }
+        DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+        assertFalse(dataSet.next());
+        dataSet.close();
     }
 
     @Test
@@ -549,26 +501,19 @@ public class ElasticSearchDataContextTest {
 
     @Test
     public void testNonDynamicMapingTableNames() throws Exception {
-        createIndex();
+        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
+        client.admin().indices().create(cir).actionGet();
+        
+        PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping, XContentType.JSON);
+        
+        client.admin().indices().putMapping(pmr).actionGet();
 
         ElasticSearchDataContext dataContext2 = new ElasticSearchDataContext(client, indexName2);
 
         assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray()));
     }
 
-    private static void createIndex() {
-        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
-        CreateIndexResponse response = client.admin().indices().create(cir).actionGet();
-
-        System.out.println("create index: " + response.isAcknowledged());
-
-        PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping);
-
-        PutMappingResponse response2 = client.admin().indices().putMapping(pmr).actionGet();
-        System.out.println("put mapping: " + response2.isAcknowledged());
-    }
-
-    private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
+    private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
         BulkRequestBuilder bulkRequest = client.prepareBulk();
 
         for (int i = 0; i < numberOfDocuments; i++) {
@@ -578,17 +523,17 @@ public class ElasticSearchDataContextTest {
         bulkRequest.execute().actionGet();
     }
 
-    private static void indexTweeterDocument(String indexType, int id, Date date) {
-        client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date))
-                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+    private void indexTweeterDocument(String indexType, int id, Date date) {
+        final String id1 = "tweet_" + indexType + "_" + id;
+        client.prepareIndex(indexName, indexType, id1).setSource(buildTweeterJson(id, date)).execute().actionGet();
     }
 
-    private static void indexTweeterDocument(String indexType, int id) {
+    private void indexTweeterDocument(String indexType, int id) {
         client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id))
                 .setId("tweet_" + indexType + "_" + id).execute().actionGet();
     }
 
-    private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
+    private void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
         client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute()
                 .actionGet();
     }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
index 8b5eb50..e08f715 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import junit.framework.TestCase;
 
 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
 import org.apache.metamodel.schema.ColumnType;
 import org.elasticsearch.common.collect.MapBuilder;
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
deleted file mode 100644
index 9ffc6b8..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import junit.framework.TestCase;
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.data.SimpleDataSetHeader;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.MutableColumn;
-
-import java.util.*;
-
-public class ElasticSearchUtilsTest extends TestCase {
-
-    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
-        MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
-        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
-        List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
-        String documentId = "doc1";
-        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
-        Map<String, Object> values = new HashMap<>();
-        values.put("value1", "theValue");
-        Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
-        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
-
-        assertEquals(primaryKeyValue, documentId);
-    }
-
-    public void testCreateRowWithParsableDates() throws Exception {
-        SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
-        SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
-        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
-        String documentId = "doc1";
-        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
-        Map<String, Object> values = new HashMap<>();
-        values.put("value1", "theValue");
-        values.put("value2", "2013-01-04T15:55:51.217+01:00");
-        Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
-        Object stringValue = row.getValue(item1);
-        Object dateValue = row.getValue(item2);
-
-        assertTrue(stringValue instanceof String);
-        assertTrue(dateValue instanceof Date);
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
deleted file mode 100644
index b94d0ab..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.node.Node;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class EmbeddedElasticsearchServer {
-
-    private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
-
-    private final Node node;
-    private final String dataDirectory;
-
-    public EmbeddedElasticsearchServer() {
-        this(DEFAULT_DATA_DIRECTORY);
-    }
-
-    public EmbeddedElasticsearchServer(String dataDirectory) {
-        this.dataDirectory = dataDirectory;
-
-        ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
-                .put("http.enabled", "true")
-                .put("path.data", dataDirectory);
-
-        node = nodeBuilder()
-                .local(true)
-                .settings(elasticsearchSettings.build())
-                .node();
-    }
-
-    public Client getClient() {
-        return node.client();
-    }
-
-    public void shutdown() {
-        node.close();
-        deleteDataDirectory();
-    }
-
-    private void deleteDataDirectory() {
-        try {
-            FileUtils.deleteDirectory(new File(dataDirectory));
-        } catch (IOException e) {
-            throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 9a3b1d8..46f930c 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -21,7 +21,7 @@
 	<name>MetaModel module for Elasticsearch</name>
 
 	<properties>
-		<elasticsearch.version>1.4.4</elasticsearch.version>
+		<elasticsearch.version>5.6.3</elasticsearch.version>
 	</properties>
 
 	<modules>

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
index 936cf4c..8f556e6 100644
--- a/elasticsearch/rest/pom.xml
+++ b/elasticsearch/rest/pom.xml
@@ -26,11 +26,6 @@ under the License.
 
 	<modelVersion>4.0.0</modelVersion>
 
-	<properties>
-		<jest.version>2.0.2</jest.version>
-		<elasticsearch.version>1.4.4</elasticsearch.version>
-	</properties>
-
 	<artifactId>MetaModel-elasticsearch-rest</artifactId>
 	<name>MetaModel module for ElasticSearch via REST client</name>
 
@@ -52,34 +47,28 @@ under the License.
 			<artifactId>commons-io</artifactId>
 		</dependency>
 
-		<!-- Jest -->
-		<dependency>
-			<groupId>io.searchbox</groupId>
-			<artifactId>jest</artifactId>
-			<version>${jest.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>commons-logging</groupId>
-					<artifactId>commons-logging</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
 		<dependency>
 			<groupId>org.slf4j</groupId>
 			<artifactId>jcl-over-slf4j</artifactId>
 		</dependency>
 		<!-- elasticsearch -->
 		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>elasticsearch-rest-high-level-client</artifactId>
 			<version>${elasticsearch.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- test -->
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-log4j12</artifactId>
+		<dependency><!-- required by elasticsearch -->
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
@@ -87,5 +76,84 @@ under the License.
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.hamcrest</groupId>
+		    <artifactId>hamcrest-all</artifactId>
+		    <version>1.3</version>
+		    <scope>test</scope>
+		</dependency>
 	</dependencies>
+	<profiles>
+		<profile>
+			<id>integration-test</id>
+			<build>
+				<plugins>
+					<plugin>
+						<artifactId>maven-failsafe-plugin</artifactId>
+						<version>2.19.1</version>
+						<executions>
+							<execution>
+								<goals>
+									<goal>integration-test</goal>
+									<goal>verify</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+
+					<plugin>
+						<groupId>io.fabric8</groupId>
+						<artifactId>docker-maven-plugin</artifactId>
+						<version>${docker-maven-plugin.version}</version>
+						<configuration>
+							<logDate>default</logDate>
+							<autoPull>true</autoPull>
+							<images>
+								<image>
+									<name>elasticsearch-metamodel</name>
+									<build>
+										<dockerFileDir>${project.build.directory}/test-classes</dockerFileDir>
+									</build>
+									<run>
+										<ports>
+											<port>9200:9200</port>
+											<port>9300:9300</port>
+										</ports>
+										<env>
+											<ES_JAVA_OPTS>-Xms1g -Xmx1g</ES_JAVA_OPTS>
+											<cluster.name>docker-cluster</cluster.name>
+											<bootstrap.memory_lock>true</bootstrap.memory_lock>
+										    <xpack.security.enabled>false</xpack.security.enabled>
+										</env>
+										<wait>
+											<url>http://${docker.host.address}:9200</url>
+											<time>300000</time>
+										</wait>
+									</run>
+								</image>
+							</images>
+						</configuration>
+						<executions>
+							<execution>
+								<id>start</id>
+								<phase>pre-integration-test</phase>
+								<goals>
+									<goal>build</goal>
+									<goal>start</goal>
+								</goals>
+							</execution>
+							<execution>
+								<id>stop</id>
+								<phase>post-integration-test</phase>
+								<goals>
+									<goal>stop</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
new file mode 100644
index 0000000..ddd7e17
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.main.MainRequest;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchRestClient extends RestHighLevelClient {
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+    public ElasticSearchRestClient(final RestClient restClient) {
+        super(restClient);
+    }
+
+    public final boolean refresh(final String indexName, final Header... headers) {
+        try {
+            return performRequest(new MainRequest(), request -> refresh(indexName),
+                    ElasticSearchRestClient::convertResponse, emptySet(), headers);
+        } catch (IOException e) {
+            logger.info("Failed to refresh index \"{}\"", indexName, e);
+        }
+        return false;
+    }
+
+    private static Request refresh(final String indexName) {
+        return new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_refresh", Collections.emptyMap(), null);
+    }
+
+    public final boolean delete(final String indexName, final Header... headers) throws IOException {
+        return performRequest(new MainRequest(), request -> delete(indexName),
+                ElasticSearchRestClient::convertResponse, emptySet(), headers);
+    }
+
+    private static Request delete(final String indexName) {
+        return new Request(HttpDelete.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+    }
+
+    public Set<Entry<String, Object>> getMappings(final String indexName, final Header... headers) throws IOException {
+        return performRequestAndParseEntity(new GetIndexRequest(), request -> getMappings(indexName), (
+                response) -> parseMappings(response, indexName), emptySet(), headers);
+    }
+
+    private static Request getMappings(final String indexName) {
+        return new Request(HttpGet.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+    }
+
+    public final boolean createMapping(final PutMappingRequest putMappingRequest, final Header... headers)
+            throws IOException {
+        return performRequest(putMappingRequest, request -> putMapping(putMappingRequest),
+                ElasticSearchRestClient::convertResponse, emptySet(), headers);
+    }
+
+    private static Request putMapping(final PutMappingRequest putMappingRequest) {
+        final String endpoint = "/" + putMappingRequest.indices()[0] + "/_mapping/" + putMappingRequest.type();
+        final ByteArrayEntity entity = new ByteArrayEntity(putMappingRequest.source().getBytes(),
+                ContentType.APPLICATION_JSON);
+        return new Request(HttpPut.METHOD_NAME, endpoint, Collections.emptyMap(), entity);
+    }
+
+    // Carbon copy of RestHighLevelClient#convertExistsResponse(Response) method, which is unaccessible from this class.
+    private static boolean convertResponse(final Response response) {
+        return response.getStatusLine().getStatusCode() == 200;
+    }
+
+    @SuppressWarnings("unchecked")
+    static Set<Entry<String, Object>> parseMappings(final XContentParser response, final String indexName) throws IOException {
+        Map<String, Object> schema = (Map<String, Object>) response.map().get(indexName);
+        Map<String, Object> tables = (Map<String, Object>) schema.get("mappings");
+
+        return tables.entrySet();
+    }
+
+    ActionResponse execute(final ActionRequest request) throws IOException {
+        if (request instanceof BulkRequest) {
+            return bulk((BulkRequest) request);
+        } else if (request instanceof IndexRequest) {
+            return index((IndexRequest) request);
+        } else if (request instanceof DeleteRequest) {
+            return delete((DeleteRequest) request);
+        } else if (request instanceof ClearScrollRequest) {
+            return clearScroll((ClearScrollRequest) request);
+        } else if (request instanceof SearchScrollRequest) {
+            return searchScroll((SearchScrollRequest) request);
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
new file mode 100644
index 0000000..91842f5
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.create.AbstractTableCreationBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+
+final class ElasticSearchRestCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchRestUpdateCallback> {
+
+    public ElasticSearchRestCreateTableBuilder(final ElasticSearchRestUpdateCallback updateCallback,
+            final Schema schema, final String name) {
+        super(updateCallback, schema, name);
+    }
+
+    @Override
+    public Table execute() throws MetaModelException {
+        final MutableTable table = getTable();
+        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
+
+        final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
+        final String indexName = dataContext.getIndexName();
+
+        final PutMappingRequest putMapping = new PutMappingRequest(indexName).type(table.getName()).source(source);
+        getUpdateCallback().execute(putMapping);
+
+        final MutableSchema schema = (MutableSchema) getSchema();
+        schema.addTable(table);
+        return table;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
index c5a5696..5b32d14 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -18,57 +18,43 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.metamodel.BatchUpdateScript;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
 import org.apache.metamodel.UpdateScript;
 import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
 import org.apache.metamodel.data.DataSet;
 import org.apache.metamodel.data.DataSetHeader;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Count;
-import io.searchbox.core.CountResult;
-import io.searchbox.core.Get;
-import io.searchbox.core.Search;
-import io.searchbox.core.SearchResult;
-import io.searchbox.indices.mapping.GetMapping;
-import io.searchbox.params.Parameters;
-
 /**
  * DataContext implementation for ElasticSearch analytics engine.
  *
@@ -86,28 +72,14 @@ import io.searchbox.params.Parameters;
  * This implementation supports either automatic discovery of a schema or manual
  * specification of a schema, through the {@link SimpleTableDef} class.
  */
-public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext,
-        UpdateableDataContext {
+public class ElasticSearchRestDataContext extends AbstractElasticSearchDataContext {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
 
-    public static final String FIELD_ID = "_id";
-
-    // 1 minute timeout
-    public static final String TIMEOUT_SCROLL = "1m";
-
     // we scroll when more than 400 rows are expected
     private static final int SCROLL_THRESHOLD = 400;
 
-    private final JestClient elasticSearchClient;
-
-    private final String indexName;
-    // Table definitions that are set from the beginning, not supposed to be
-    // changed.
-    private final List<SimpleTableDef> staticTableDefinitions;
-
-    // Table definitions that are discovered, these can change
-    private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+    private final ElasticSearchRestClient elasticSearchClient;
 
     /**
      * Constructs a {@link ElasticSearchRestDataContext}. This constructor
@@ -122,18 +94,14 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      *            an array of {@link SimpleTableDef}s, which define the table
      *            and column model of the ElasticSearch index.
      */
-    public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) {
-        super(false);
+    public ElasticSearchRestDataContext(final ElasticSearchRestClient client, final String indexName,
+            final SimpleTableDef... tableDefinitions) {
+        super(indexName, tableDefinitions);
+
         if (client == null) {
             throw new IllegalArgumentException("ElasticSearch Client cannot be null");
         }
-        if (indexName == null || indexName.trim().length() == 0) {
-            throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
-        }
         this.elasticSearchClient = client;
-        this.indexName = indexName;
-        this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
-                .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
         this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
     }
 
@@ -147,65 +115,51 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      * @param indexName
      *            the name of the ElasticSearch index to represent
      */
-    public ElasticSearchRestDataContext(JestClient client, String indexName) {
+    public ElasticSearchRestDataContext(final ElasticSearchRestClient client, String indexName) {
         this(client, indexName, new SimpleTableDef[0]);
     }
 
-    /**
-     * Performs an analysis of the available indexes in an ElasticSearch cluster
-     * {@link JestClient} instance and detects the elasticsearch types structure
-     * based on the metadata provided by the ElasticSearch java client.
-     *
-     * @see {@link #detectTable(JsonObject, String)}
-     * @return a mutable schema instance, useful for further fine tuning by the
-     *         user.
-     */
-    private SimpleTableDef[] detectSchema() {
+    @Override
+    protected SimpleTableDef[] detectSchema() {
         logger.info("Detecting schema for index '{}'", indexName);
 
-        final JestResult jestResult;
+        final Set<Entry<String, Object>> mappings;
         try {
-            final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build();
-            jestResult = elasticSearchClient.execute(getMapping);
-        } catch (Exception e) {
+            mappings = getElasticSearchClient().getMappings(indexName);
+        } catch (IOException e) {
             logger.error("Failed to retrieve mappings", e);
             throw new MetaModelException("Failed to execute request for index information needed to detect schema", e);
         }
 
-        if (!jestResult.isSucceeded()) {
-            logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage());
-            throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage());
-        }
-
         final List<SimpleTableDef> result = new ArrayList<>();
 
-        final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName)
-                .getAsJsonObject("mappings").entrySet();
-        if (mappings.size() == 0) {
+        if (mappings.isEmpty()) {
             logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
         } else {
+            for (Entry<String, Object> mapping : mappings) {
+                final String documentType = mapping.getKey();
 
-            for (Map.Entry<String, JsonElement> entry : mappings) {
-                final String documentType = entry.getKey();
+                @SuppressWarnings("unchecked")
+                Map<String, Object> mappingConfiguration = (Map<String, Object>) mapping.getValue();
+                @SuppressWarnings("unchecked")
+                Map<String, Object> properties = (Map<String, Object>) mappingConfiguration.get("properties");
 
                 try {
-                    final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties")
-                            .getAsJsonObject(), documentType);
+                    final SimpleTableDef table = detectTable(properties, documentType);
                     result.add(table);
                 } catch (Exception e) {
                     logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
                 }
             }
         }
-        final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
-        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
-            @Override
-            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
+        return sortTables(result);
+    }
+
+    @Override
+    protected void onSchemaCacheRefreshed() {
+        getElasticSearchClient().refresh(indexName);
 
-        return tableDefArray;
+        detectSchema();
     }
 
     /**
@@ -219,60 +173,22 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      *            the name of the index type
      * @return a table definition for ElasticSearch.
      */
-    private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) {
-        final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties);
+    private static SimpleTableDef detectTable(final Map<String, Object> metadataProperties, final String documentType) {
+        final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataProperties);
         return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes());
     }
 
     @Override
-    protected Schema getMainSchema() throws MetaModelException {
-        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
-        for (final SimpleTableDef tableDef : staticTableDefinitions) {
-            addTable(theSchema, tableDef);
-        }
-
-        final SimpleTableDef[] tables = detectSchema();
-        synchronized (this) {
-            dynamicTableDefinitions.clear();
-            dynamicTableDefinitions.addAll(Arrays.asList(tables));
-            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
-                final List<String> tableNames = theSchema.getTableNames();
-
-                if (!tableNames.contains(tableDef.getName())) {
-                    addTable(theSchema, tableDef);
-                }
-            }
-        }
-
-        return theSchema;
-    }
-
-    private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
-        final MutableTable table = tableDef.toTable().setSchema(theSchema);
-        final Column idColumn = table.getColumnByName(FIELD_ID);
-        if (idColumn != null && idColumn instanceof MutableColumn) {
-            final MutableColumn mutableColumn = (MutableColumn) idColumn;
-            mutableColumn.setPrimaryKey(true);
-        }
-        theSchema.addTable(table);
-    }
-
-    @Override
-    protected String getMainSchemaName() throws MetaModelException {
-        return indexName;
-    }
-
-    @Override
-    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
-            List<FilterItem> whereItems, int firstRow, int maxRows) {
+    protected DataSet materializeMainSchemaTable(final Table table, final List<SelectItem> selectItems,
+            final List<FilterItem> whereItems, final int firstRow, final int maxRows) {
         final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
                 LogicalOperator.AND);
         if (queryBuilder != null) {
             // where clause can be pushed down to an ElasticSearch query
             SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder);
-            SearchResult result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
+            SearchResponse result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
 
-            return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems);
+            return new ElasticSearchRestDataSet(getElasticSearchClient(), result, selectItems);
         }
         return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
     }
@@ -282,30 +198,30 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         return !limitMaxRowsIsSet(maxRows) || maxRows > SCROLL_THRESHOLD;
     }
 
-    private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) {
-        Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(
-                table.getName());
+    private SearchResponse executeSearch(final Table table, final SearchSourceBuilder searchSourceBuilder,
+            final boolean scroll) {
+        final SearchRequest searchRequest = new SearchRequest(new String[] { getIndexName() }, searchSourceBuilder)
+                .types(table.getName());
+
         if (scroll) {
-            builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
+            searchRequest.scroll(TIMEOUT_SCROLL);
         }
 
-        Search search = builder.build();
-        SearchResult result;
         try {
-            result = elasticSearchClient.execute(search);
-        } catch (Exception e) {
+            return getElasticSearchClient().search(searchRequest);
+        } catch (IOException e) {
             logger.warn("Could not execute ElasticSearch query", e);
             throw new MetaModelException("Could not execute ElasticSearch query", e);
         }
-        return result;
     }
 
     @Override
     protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
-        SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
+        SearchResponse searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
                 maxRows));
 
-        return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns.stream().map(SelectItem::new).collect(Collectors.toList()));
+        return new ElasticSearchRestDataSet(getElasticSearchClient(), searchResult, columns.stream()
+                .map(SelectItem::new).collect(Collectors.toList()));
     }
 
     private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) {
@@ -317,7 +233,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         if (limitMaxRowsIsSet(maxRows)) {
             searchRequest.size(maxRows);
         } else {
-            searchRequest.size(Integer.MAX_VALUE);
+            searchRequest.size(SCROLL_THRESHOLD);
         }
 
         if (queryBuilder != null) {
@@ -337,12 +253,16 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         final String documentType = table.getName();
         final String id = keyValue.toString();
 
-        final Get get = new Get.Builder(indexName, id).type(documentType).build();
-        final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get);
-
         final DataSetHeader header = new SimpleDataSetHeader(selectItems);
 
-        return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header);
+        try {
+            return ElasticSearchUtils.createRow(getElasticSearchClient()
+                    .get(new GetRequest(getIndexName(), documentType, id))
+                    .getSource(), id, header);
+        } catch (IOException e) {
+            logger.warn("Could not execute ElasticSearch query", e);
+            throw new MetaModelException("Could not execute ElasticSearch query", e);
+        }
     }
 
     @Override
@@ -352,30 +272,23 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
             return null;
         }
         final String documentType = table.getName();
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+        final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
         sourceBuilder.query(QueryBuilders.termQuery("_type", documentType));
+        sourceBuilder.size(0);
 
-        Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build();
-
-        CountResult countResult;
         try {
-            countResult = elasticSearchClient.execute(count);
+            return getElasticSearchClient().search(new SearchRequest(new String[] { getIndexName() }, sourceBuilder))
+                    .getHits().getTotalHits();
         } catch (Exception e) {
             logger.warn("Could not execute ElasticSearch get query", e);
             throw new MetaModelException("Could not execute ElasticSearch get query", e);
         }
-
-        return countResult.getCount();
-    }
-
-    private boolean limitMaxRowsIsSet(int maxRows) {
-        return (maxRows != -1);
     }
 
     @Override
     public UpdateSummary executeUpdate(UpdateScript update) {
         final boolean isBatch = update instanceof BatchUpdateScript;
-        final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
+        final ElasticSearchRestUpdateCallback callback = new ElasticSearchRestUpdateCallback(this, isBatch);
         update.run(callback);
         callback.onExecuteUpdateFinished();
         return callback.getUpdateSummary();
@@ -384,14 +297,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
     /**
      * Gets the {@link JestClient} that this {@link DataContext} is wrapping.
      */
-    public JestClient getElasticSearchClient() {
+    public ElasticSearchRestClient getElasticSearchClient() {
         return elasticSearchClient;
     }
-
-    /**
-     * Gets the name of the index that this {@link DataContext} is working on.
-     */
-    public String getIndexName() {
-        return indexName;
-    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
index b2dc4c3..b1756b7 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
@@ -18,6 +18,14 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.metamodel.ConnectionException;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.factory.DataContextFactory;
@@ -25,10 +33,8 @@ import org.apache.metamodel.factory.DataContextProperties;
 import org.apache.metamodel.factory.ResourceFactoryRegistry;
 import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
 import org.apache.metamodel.util.SimpleTableDef;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.config.HttpClientConfig;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 
 /**
  * Factory for ElasticSearch data context of REST type.
@@ -72,18 +78,20 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory {
         return true;
     }
 
-    private JestClient createClient(DataContextProperties properties) {
-        final String serverUri = properties.getUrl();
-        final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri);
+    private ElasticSearchRestClient createClient(final DataContextProperties properties) throws MalformedURLException {
+        final URL url = new URL(properties.getUrl());
+        final RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort()));
+        
         if (properties.getUsername() != null) {
-            builder.defaultCredentials(properties.getUsername(), properties.getPassword());
+            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(),
+                    properties.getPassword()));
+
+            builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+                    credentialsProvider));
         }
 
-        final JestClientFactory clientFactory = new JestClientFactory();
-        final HttpClientConfig httpClientConfig = new HttpClientConfig(builder);
-        clientFactory.setHttpClientConfig(httpClientConfig);
-        final JestClient client = clientFactory.getObject();
-        return client;
+        return new ElasticSearchRestClient(builder.build());
     }
 
     private String getIndex(DataContextProperties properties) {
@@ -97,10 +105,14 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory {
     @Override
     public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
             throws UnsupportedDataContextPropertiesException, ConnectionException {
-        final JestClient client = createClient(properties);
-        final String indexName = getIndex(properties);
-        final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
-        return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+        try {
+            ElasticSearchRestClient client = createClient(properties);
+            final String indexName = getIndex(properties);
+            final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
+            return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+        } catch (MalformedURLException e) {
+            throw new UnsupportedDataContextPropertiesException(e);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
new file mode 100644
index 0000000..d79b271
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+final class ElasticSearchRestDataSet extends AbstractElasticSearchDataSet {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataSet.class);
+
+    private final ElasticSearchRestClient _client;
+
+    public ElasticSearchRestDataSet(final ElasticSearchRestClient client, final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+        super(searchResponse, selectItems);
+        _client = client;
+    }
+
+    @Override
+    public void closeNow() {
+        final String scrollId = _searchResponse.getScrollId();
+        final ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+        clearScrollRequest.addScrollId(scrollId);
+        try {
+            _client.execute(clearScrollRequest);
+        } catch (IOException e) {
+            logger.warn("Could not clear scroll.", e);
+        }
+    }
+
+    @Override
+    protected SearchResponse scrollSearchResponse(final String scrollId) throws IOException {
+        return _client.searchScroll(new SearchScrollRequest(scrollId).scroll(
+                AbstractElasticSearchDataContext.TIMEOUT_SCROLL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
new file mode 100644
index 0000000..f8caa2d
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * {@link RowDeletionBuilder} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestDeleteBuilder extends AbstractRowDeletionBuilder {
+    private final ElasticSearchRestUpdateCallback _updateCallback;
+
+    public ElasticSearchRestDeleteBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final Table table = getTable();
+        final String documentType = table.getName();
+
+        final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext();
+        final String indexName = dataContext.getIndexName();
+
+        final List<FilterItem> whereItems = getWhereItems();
+
+        // delete by query - note that creteQueryBuilderForSimpleWhere may
+        // return matchAllQuery() if no where items are present.
+        final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
+                LogicalOperator.AND);
+        if (queryBuilder == null) {
+            // TODO: The where items could not be pushed down to a query. We
+            // could solve this by running a query first, gather all
+            // document IDs and then delete by IDs.
+            throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
+                    + whereItems);
+        }
+
+        final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.query(queryBuilder);
+
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        searchRequest.types(documentType);
+        searchRequest.source(searchSourceBuilder);
+
+        try {
+            final SearchResponse response = dataContext.getElasticSearchClient().search(searchRequest);
+
+            final Iterator<SearchHit> iterator = response.getHits().iterator();
+            while (iterator.hasNext()) {
+                final SearchHit hit = iterator.next();
+                final String typeId = hit.getId();
+
+                DeleteRequest deleteRequest = new DeleteRequest(indexName, documentType, typeId);
+
+                _updateCallback.execute(deleteRequest);
+            }
+        } catch (IOException e) {
+            throw new MetaModelException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
new file mode 100644
index 0000000..0ba4f66
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexRequest;
+
+final class ElasticSearchRestInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchRestUpdateCallback> {
+
+    public ElasticSearchRestInsertBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
+        super(updateCallback, table);
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final ElasticSearchRestUpdateCallback updateCallback = getUpdateCallback();
+        final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
+        final String indexName = dataContext.getIndexName();
+        final String documentType = getTable().getName();
+
+        final Map<String, Object> source = new HashMap<>();
+        final Column[] columns = getColumns();
+        final Object[] values = getValues();
+        String id = null;
+        for (int i = 0; i < columns.length; i++) {
+            if (isSet(columns[i])) {
+                final String columnName = columns[i].getName();
+
+                final Object value = values[i];
+                if (ElasticSearchUtils.FIELD_ID.equals(columnName)) {
+                    if (value != null) {
+                        id = value.toString();
+                    }
+                } else {
+                    final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
+                    source.put(fieldName, value);
+                }
+            }
+        }
+
+        assert !source.isEmpty();
+
+        IndexRequest indexRequest = new IndexRequest(indexName, documentType, id);
+        indexRequest.source(source);
+
+        getUpdateCallback().execute(indexRequest);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
new file mode 100644
index 0000000..defd18f
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link UpdateCallback} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestUpdateCallback extends AbstractUpdateCallback {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestUpdateCallback.class);
+
+    private static final int BULK_BUFFER_SIZE = 1000;
+
+    private BulkRequest bulkRequest;
+    private int bulkActionCount = 0;
+    private final boolean isBatch;
+
+    public ElasticSearchRestUpdateCallback(final ElasticSearchRestDataContext dataContext, final boolean isBatch) {
+        super(dataContext);
+        this.isBatch = isBatch;
+    }
+
+    private boolean isBatch() {
+        return isBatch;
+    }
+
+    @Override
+    public ElasticSearchRestDataContext getDataContext() {
+        return (ElasticSearchRestDataContext) super.getDataContext();
+    }
+
+    @Override
+    public TableCreationBuilder createTable(final Schema schema, final String name) throws IllegalArgumentException,
+            IllegalStateException {
+        return new ElasticSearchRestCreateTableBuilder(this, schema, name);
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return false;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(final Table table) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(final Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchRestInsertBuilder(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return true;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(final Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchRestDeleteBuilder(this, table);
+    }
+
+    public void onExecuteUpdateFinished() {
+        if (isBatch()) {
+            flushBulkActions();
+        }
+
+        getDataContext().refreshSchemas();
+    }
+
+    private void flushBulkActions() {
+        if (bulkRequest == null || bulkActionCount == 0) {
+            // nothing to flush
+            return;
+        }
+
+        logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
+        executeBlocking(bulkRequest);
+
+        bulkActionCount = 0;
+        bulkRequest = null;
+    }
+
+    public void execute(final ActionRequest action) {
+        if (isBatch() && (action instanceof DocWriteRequest<?>)) {
+            getBulkRequest().add((DocWriteRequest<?>) action);
+            bulkActionCount++;
+            if (bulkActionCount == BULK_BUFFER_SIZE) {
+                flushBulkActions();
+            }
+        } else {
+            executeBlocking(action);
+        }
+    }
+
+    private void executeBlocking(final ActionRequest action) {
+        try {
+            if (action instanceof PutMappingRequest) {
+                getDataContext().getElasticSearchClient().createMapping((PutMappingRequest) action);
+            } else {
+                final ActionResponse result = getDataContext().getElasticSearchClient().execute(action);
+
+                if (result instanceof BulkResponse && ((BulkResponse) result).hasFailures()) {
+                    BulkItemResponse[] failedItems = ((BulkResponse) result).getItems();
+                    for (int i = 0; i < failedItems.length; i++) {
+                        if (failedItems[i].isFailed()) {
+                            final BulkItemResponse failedItem = failedItems[i];
+                            logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i + 1,
+                                    failedItems.length, failedItem.getId(), failedItem.getOpType(), failedItem.status(),
+                                    failedItem.getFailureMessage());
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            logger.warn("Could not execute command {} ", action, e);
+            throw new MetaModelException("Could not execute " + action, e);
+        }
+    }
+
+    private BulkRequest getBulkRequest() {
+        if (bulkRequest == null) {
+            bulkRequest = new BulkRequest();
+        }
+        return bulkRequest;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
deleted file mode 100644
index 1bb026d..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.Action;
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import org.apache.metamodel.MetaModelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-final class JestClientExecutor {
-    private static final Logger logger = LoggerFactory.getLogger(JestClientExecutor.class);
-
-    static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest) {
-        return execute(jestClient, clientRequest, true);
-    }
-
-    static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest, boolean doThrow) {
-        try {
-            final T result = jestClient.execute(clientRequest);
-            logger.debug("{} response: acknowledged={}", clientRequest, result.isSucceeded());
-            return result;
-        } catch (IOException e) {
-            logger.warn("Could not execute command {} ", clientRequest, e);
-            if (doThrow) {
-                throw new MetaModelException("Could not execute command " + clientRequest, e);
-            }
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
deleted file mode 100644
index cc42b07..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.GenericResultAbstractAction;
-
-public class JestDeleteScroll extends GenericResultAbstractAction {
-    private JestDeleteScroll(Builder builder) {
-        super(builder);
-        this.payload = builder.getScrollId();
-        setURI(buildURI());
-    }
-
-    @Override
-    public String getRestMethodName() {
-        return "DELETE";
-    }
-
-    @Override
-    protected String buildURI() {
-        return super.buildURI() + "/_search/scroll";
-    }
-
-    public static class Builder extends GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> {
-        private final String scrollId;
-
-        public Builder(String scrollId) {
-            this.scrollId = scrollId;
-        }
-
-        @Override
-        public JestDeleteScroll build() {
-            return new JestDeleteScroll(this);
-        }
-
-        public String getScrollId() {
-            return scrollId;
-        }
-    }
-
-}