You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/01/26 04:13:29 UTC

[1/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

Repository: metamodel
Updated Branches:
  refs/heads/master c57d50805 -> bda8d764f


http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
deleted file mode 100644
index 53dbdf6..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataContextTest.java
+++ /dev/null
@@ -1,615 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.config.HttpClientConfig;
-import org.apache.metamodel.MetaModelHelper;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.UpdateScript;
-import org.apache.metamodel.UpdateableDataContext;
-import org.apache.metamodel.create.CreateTable;
-import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.DataSetTableModel;
-import org.apache.metamodel.data.InMemoryDataSet;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.delete.DeleteFrom;
-import org.apache.metamodel.drop.DropTable;
-import org.apache.metamodel.elasticsearch.rest.utils.EmbeddedElasticsearchServer;
-import org.apache.metamodel.query.FunctionType;
-import org.apache.metamodel.query.Query;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.query.parser.QueryParserException;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-import org.apache.metamodel.update.Update;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import javax.swing.table.TableModel;
-import java.io.IOException;
-import java.util.*;
-
-import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.*;
-
-public class JestElasticSearchDataContextTest {
-
-    private static final String indexName = "twitter";
-    private static final String indexType1 = "tweet1";
-    private static final String indexType2 = "tweet2";
-    private static final String indexName2 = "twitter2";
-    private static final String indexType3 = "tweet3";
-    private static final String bulkIndexType = "bulktype";
-    private static final String peopleIndexType = "peopletype";
-    private static final String mapping =
-            "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
-    private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
-    private static JestClient client;
-    private static UpdateableDataContext dataContext;
-
-    @BeforeClass
-    public static void beforeTests() throws Exception {
-        embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
-        final int port = Integer.parseInt(embeddedElasticsearchServer.getClient().settings().get("http.port"));
-        JestClientFactory factory = new JestClientFactory();
-        factory.setHttpClientConfig(new HttpClientConfig
-                .Builder("http://localhost:" + port)
-                .multiThreaded(true)
-                .build());
-        client = factory.getObject();
-
-        indexTweeterDocument(indexType1, 1);
-        indexTweeterDocument(indexType2, 1);
-        indexTweeterDocument(indexType2, 2, null);
-        insertPeopleDocuments();
-        indexTweeterDocument(indexType2, 1);
-        indexBulkDocuments(indexName, bulkIndexType, 10);
-
-        // The refresh API allows to explicitly refresh one or more index,
-        // making all operations performed since the last refresh available for
-        // search
-        dataContext = new ElasticSearchRestDataContext(client, indexName);
-        Thread.sleep(1000);
-        System.out.println("Embedded ElasticSearch server created!");
-    }
-
-    private static void insertPeopleDocuments() throws IOException {
-        indexOnePeopleDocument("female", 20, 5);
-        indexOnePeopleDocument("female", 17, 8);
-        indexOnePeopleDocument("female", 18, 9);
-        indexOnePeopleDocument("female", 19, 10);
-        indexOnePeopleDocument("female", 20, 11);
-        indexOnePeopleDocument("male", 19, 1);
-        indexOnePeopleDocument("male", 17, 2);
-        indexOnePeopleDocument("male", 18, 3);
-        indexOnePeopleDocument("male", 18, 4);
-    }
-
-    @AfterClass
-    public static void afterTests() {
-        embeddedElasticsearchServer.shutdown();
-        System.out.println("Embedded ElasticSearch server shut down!");
-    }
-
-    @Test
-    public void testSimpleQuery() throws Exception {
-        assertEquals("[bulktype, peopletype, tweet1, tweet2]",
-                Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
-
-        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
-
-        assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray()));
-
-        assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
-        assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
-        assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
-
-        try (DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) {
-            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
-
-            assertTrue(ds.next());
-            assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
-        }
-    }
-
-    @Test
-    public void testDocumentIdAsPrimaryKey() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
-        Column[] pks = table.getPrimaryKeys().toArray(new Column[0]);
-        assertEquals(1, pks.length);
-        assertEquals("_id", pks[0].getName());
-
-        try (DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute()) {
-            assertTrue(ds.next());
-            assertEquals("Row[values=[user1, tweet_tweet2_1]]", ds.getRow().toString());
-        }
-    }
-
-    @Test
-    public void testExecutePrimaryKeyLookupQuery() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
-        Column[] pks = table.getPrimaryKeys().toArray(new Column[0]);
-
-        try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) {
-            assertTrue(ds.next());
-            Object dateValue = ds.getRow().getValue(2);
-            assertEquals("Row[values=[tweet_tweet2_1, 1, " + dateValue + ", user1]]", ds.getRow().toString());
-
-            assertFalse(ds.next());
-
-            assertEquals(InMemoryDataSet.class, ds.getClass());
-        }
-    }
-
-    @Test
-    public void testDateIsHandledAsDate() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
-        Column column = table.getColumnByName("postDate");
-        ColumnType type = column.getType();
-        assertEquals(ColumnType.DATE, type);
-
-        DataSet dataSet = dataContext.query().from(table).select(column).execute();
-        while (dataSet.next()) {
-            Object value = dataSet.getRow().getValue(column);
-            assertTrue("Got class: " + value.getClass() + ", expected Date (or subclass)", value instanceof Date);
-        }
-    }
-
-    @Test
-    public void testNumberIsHandledAsNumber() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
-        Column column = table.getColumnByName("age");
-        ColumnType type = column.getType();
-        assertEquals(ColumnType.BIGINT, type);
-
-        DataSet dataSet = dataContext.query().from(table).select(column).execute();
-        while (dataSet.next()) {
-            Object value = dataSet.getRow().getValue(column);
-            assertTrue("Got class: " + value.getClass() + ", expected Number (or subclass)", value instanceof Number);
-        }
-    }
-
-    @Test
-    public void testCreateTableInsertQueryAndDrop() throws Exception {
-        final Schema schema = dataContext.getDefaultSchema();
-        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
-        createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
-        dataContext.executeUpdate(createTable);
-
-        final Table table = schema.getTableByName("testCreateTable");
-        assertNotNull(table);
-        assertEquals("[" + ElasticSearchRestDataContext.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames().toArray()));
-
-        final Column fooColumn = table.getColumnByName("foo");
-        final Column idColumn = table.getPrimaryKeys().get(0);
-        assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]",
-                idColumn.toString());
-
-        dataContext.executeUpdate(new UpdateScript() {
-            @Override
-            public void run(UpdateCallback callback) {
-                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-            }
-        });
-
-        dataContext.refreshSchemas();
-
-
-        try (DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute()) {
-            assertTrue(ds.next());
-            assertEquals("hello", ds.getRow().getValue(fooColumn).toString());
-            assertNotNull(ds.getRow().getValue(idColumn));
-            assertTrue(ds.next());
-            assertEquals("world", ds.getRow().getValue(fooColumn).toString());
-            assertNotNull(ds.getRow().getValue(idColumn));
-            assertFalse(ds.next());
-        }
-
-        dataContext.executeUpdate(new DropTable(table));
-
-        dataContext.refreshSchemas();
-
-        assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
-    }
-
-    @Test
-    public void testDetectOutsideChanges() throws Exception {
-        // Create the type in ES
-        final IndicesAdminClient indicesAdmin = embeddedElasticsearchServer.getClient().admin().indices();
-        final String tableType = "outsideTable";
-
-        Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" };
-
-        new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
-                .execute().actionGet();
-
-        dataContext.refreshSchemas();
-
-        assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
-
-        new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
-        dataContext.refreshSchemas();
-        assertNull(dataContext.getTableByQualifiedLabel(tableType));
-    }
-
-    @Test
-    public void testDeleteAll() throws Exception {
-        final Schema schema = dataContext.getDefaultSchema();
-        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
-        createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
-        dataContext.executeUpdate(createTable);
-
-        final Table table = schema.getTableByName("testCreateTable");
-
-        dataContext.executeUpdate(new UpdateScript() {
-            @Override
-            public void run(UpdateCallback callback) {
-                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-            }
-        });
-
-        dataContext.executeUpdate(new DeleteFrom(table));
-
-        Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
-                .toQuery());
-        assertEquals("Count is wrong", 0, ((Number) row.getValue(0)).intValue());
-
-        dataContext.executeUpdate(new DropTable(table));
-    }
-
-    @Test
-    public void testDeleteByQuery() throws Exception {
-        final Schema schema = dataContext.getDefaultSchema();
-        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
-        createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
-        dataContext.executeUpdate(createTable);
-
-        final Table table = schema.getTableByName("testCreateTable");
-
-        dataContext.executeUpdate(new UpdateScript() {
-            @Override
-            public void run(UpdateCallback callback) {
-                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-            }
-        });
-
-        dataContext.executeUpdate(new DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42));
-
-        Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
-                dataContext.query().from(table).select("foo", "bar").toQuery());
-        assertEquals("Row[values=[world, 43]]", row.toString());
-
-        dataContext.executeUpdate(new DropTable(table));
-    }
-
-    @Test
-    public void testDeleteUnsupportedQueryType() throws Exception {
-        final Schema schema = dataContext.getDefaultSchema();
-        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
-        createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
-        dataContext.executeUpdate(createTable);
-
-        final Table table = schema.getTableByName("testCreateTable");
-        try {
-
-            dataContext.executeUpdate(new UpdateScript() {
-                @Override
-                public void run(UpdateCallback callback) {
-                    callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                    callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-                }
-            });
-
-            // greater than is not yet supported
-            try {
-                dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
-                fail("Exception expected");
-            } catch (UnsupportedOperationException e) {
-                assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
-                        e.getMessage());
-            }
-
-        } finally {
-            dataContext.executeUpdate(new DropTable(table));
-        }
-    }
-
-    @Test
-    public void testUpdateRow() throws Exception {
-        final Schema schema = dataContext.getDefaultSchema();
-        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
-        createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
-        dataContext.executeUpdate(createTable);
-
-        final Table table = schema.getTableByName("testCreateTable");
-        try {
-
-            dataContext.executeUpdate(new UpdateScript() {
-                @Override
-                public void run(UpdateCallback callback) {
-                    callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                    callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-                }
-            });
-
-            dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
-
-            DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
-            assertTrue(dataSet.next());
-            assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
-            assertTrue(dataSet.next());
-            assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
-            assertFalse(dataSet.next());
-            dataSet.close();
-        } finally {
-            dataContext.executeUpdate(new DropTable(table));
-        }
-    }
-
-    @Test
-    public void testDropTable() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
-
-        // assert that the table was there to begin with
-        {
-            DataSet ds = dataContext.query().from(table).selectCount().execute();
-            ds.next();
-            assertEquals("Count is wrong", 9, ((Number) ds.getRow().getValue(0)).intValue());
-            ds.close();
-        }
-
-        dataContext.executeUpdate(new DropTable(table));
-        try {
-            DataSet ds = dataContext.query().from(table).selectCount().execute();
-            ds.next();
-            assertEquals("Count is wrong", 0, ((Number) ds.getRow().getValue(0)).intValue());
-            ds.close();
-        } finally {
-            // restore the people documents for the next tests
-            insertPeopleDocuments();
-            embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
-            dataContext = new ElasticSearchRestDataContext(client, indexName);
-        }
-    }
-
-    @Test
-    public void testWhereColumnEqualsValues() throws Exception {
-        try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
-                .isEquals("user4").execute()) {
-            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
-
-            assertTrue(ds.next());
-            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
-            assertFalse(ds.next());
-        }
-    }
-
-    @Test
-    public void testWhereColumnIsNullValues() throws Exception {
-        try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate")
-                .isNull().execute()) {
-            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
-
-            assertTrue(ds.next());
-            assertEquals("Row[values=[2]]", ds.getRow().toString());
-            assertFalse(ds.next());
-        }
-    }
-
-    @Test
-    public void testWhereColumnIsNotNullValues() throws Exception {
-        try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate")
-                .isNotNull().execute()) {
-            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
-
-            assertTrue(ds.next());
-            assertEquals("Row[values=[1]]", ds.getRow().toString());
-            assertFalse(ds.next());
-        }
-    }
-
-    @Test
-    public void testWhereMultiColumnsEqualValues() throws Exception {
-        try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
-                .isEquals("user4").and("message").ne(5).execute()) {
-            assertEquals(JestElasticSearchDataSet.class, ds.getClass());
-
-            assertTrue(ds.next());
-            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
-            assertFalse(ds.next());
-        }
-    }
-
-    @Test
-    public void testWhereColumnInValues() throws Exception {
-        try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
-                .in("user4", "user5").orderBy("message").execute()) {
-            assertTrue(ds.next());
-
-            String row1 = ds.getRow().toString();
-            assertEquals("Row[values=[user4, 4]]", row1);
-            assertTrue(ds.next());
-
-            String row2 = ds.getRow().toString();
-            assertEquals("Row[values=[user5, 5]]", row2);
-
-            assertFalse(ds.next());
-        }
-    }
-
-    @Test
-    public void testGroupByQuery() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
-
-        Query q = new Query();
-        q.from(table);
-        q.groupBy(table.getColumnByName("gender"));
-        q.select(new SelectItem(table.getColumnByName("gender")),
-                new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
-                new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
-                        "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId"));
-        q.orderBy("gender");
-        DataSet data = dataContext.executeQuery(q);
-        assertEquals(
-                "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
-                Arrays.toString(data.getSelectItems().toArray()));
-
-        assertTrue(data.next());
-        assertEquals("Row[values=[female, 20, 17, 5, 5]]", data.getRow().toString());
-        assertTrue(data.next());
-        assertEquals("Row[values=[male, 19, 17, 4, 1]]", data.getRow().toString());
-        assertFalse(data.next());
-    }
-
-    @Test
-    public void testFilterOnNumberColumn() {
-        Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType);
-        Query q = dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery();
-        DataSet data = dataContext.executeQuery(q);
-        String[] expectations = new String[] { "Row[values=[user8]]", "Row[values=[user9]]" };
-
-        assertTrue(data.next());
-        assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
-        assertTrue(data.next());
-        assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
-        assertFalse(data.next());
-    }
-
-    @Test
-    public void testMaxRows() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
-        Query query = new Query().from(table).select(table.getColumns()).setMaxRows(5);
-        DataSet dataSet = dataContext.executeQuery(query);
-
-        TableModel tableModel = new DataSetTableModel(dataSet);
-        assertEquals(5, tableModel.getRowCount());
-    }
-
-    @Test
-    public void testCountQuery() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType);
-        Query q = new Query().selectCount().from(table);
-
-        List<Object[]> data = dataContext.executeQuery(q).toObjectArrays();
-        assertEquals(1, data.size());
-        Object[] row = data.get(0);
-        assertEquals(1, row.length);
-        assertEquals(10, ((Number) row[0]).intValue());
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testQueryForANonExistingTable() throws Exception {
-        dataContext.query().from("nonExistingTable").select("user").and("message").execute();
-    }
-
-    @Test(expected = QueryParserException.class)
-    public void testQueryForAnExistingTableAndNonExistingField() throws Exception {
-        indexTweeterDocument(indexType1, 1);
-        dataContext.query().from(indexType1).select("nonExistingField").execute();
-    }
-
-    @Test
-    public void testNonDynamicMapingTableNames() throws Exception {
-        createIndex();
-
-        ElasticSearchRestDataContext dataContext2 = new ElasticSearchRestDataContext(client, indexName2);
-
-        assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray()));
-    }
-
-    private static void createIndex() {
-        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
-        CreateIndexResponse response =
-                embeddedElasticsearchServer.getClient().admin().indices().create(cir).actionGet();
-
-        System.out.println("create index: " + response.isAcknowledged());
-
-        PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping);
-
-        PutMappingResponse response2 =
-                embeddedElasticsearchServer.getClient().admin().indices().putMapping(pmr).actionGet();
-        System.out.println("put mapping: " + response2.isAcknowledged());
-    }
-
-    private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
-        BulkRequestBuilder bulkRequest = embeddedElasticsearchServer.getClient().prepareBulk();
-
-        for (int i = 0; i < numberOfDocuments; i++) {
-            bulkRequest.add(embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType,
-                    Integer.toString(i)).setSource(
-                    buildTweeterJson(i)));
-        }
-        bulkRequest.execute().actionGet();
-    }
-
-    private static void indexTweeterDocument(String indexType, int id, Date date) {
-        embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date))
-                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
-    }
-
-    private static void indexTweeterDocument(String indexType, int id) {
-        embeddedElasticsearchServer.getClient().prepareIndex(indexName, indexType).setSource(buildTweeterJson(id))
-                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
-    }
-
-    private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
-        embeddedElasticsearchServer.getClient().prepareIndex(indexName, peopleIndexType)
-                .setSource(buildPeopleJson(gender, age, id)).execute()
-                .actionGet();
-    }
-
-    private static Map<String, Object> buildTweeterJson(int elementId) {
-        return buildTweeterJson(elementId, new Date());
-    }
-
-    private static Map<String, Object> buildTweeterJson(int elementId, Date date) {
-        Map<String, Object> map = new LinkedHashMap<>();
-        map.put("user", "user" + elementId);
-        map.put("postDate", date);
-        map.put("message", elementId);
-        return map;
-    }
-
-    private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws IOException {
-        return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
deleted file mode 100644
index 6eeac6a..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParserTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.schema.ColumnType;
-import org.elasticsearch.common.collect.MapBuilder;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-
-public class JestElasticSearchMetaDataParserTest extends TestCase {
-
-    public void testParseMetadataInfo() throws Exception {
-        Map<String, Object> metadata = new LinkedHashMap<>();
-        metadata.put("message", MapBuilder.newMapBuilder().put("type", "long").immutableMap());
-        metadata.put("postDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap());
-        metadata.put("anotherDate", MapBuilder.newMapBuilder().put("type", "date").put("format", "dateOptionalTime").immutableMap());
-        metadata.put("user", MapBuilder.newMapBuilder().put("type", "string").immutableMap());
-        metadata.put("critical", MapBuilder.newMapBuilder().put("type", "boolean").immutableMap());
-        metadata.put("income", MapBuilder.newMapBuilder().put("type", "double").immutableMap());
-        metadata.put("untypedthingie", MapBuilder.newMapBuilder().put("foo", "bar").immutableMap());
-        final Gson gson = new Gson();
-        ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser
-                .parse((JsonObject) gson.toJsonTree(metadata));
-        String[] columnNames = metaData.getColumnNames();
-        ColumnType[] columnTypes = metaData.getColumnTypes();
-
-        assertTrue(columnNames.length == 8);
-        assertEquals(columnNames[0], "_id");
-        assertEquals(columnNames[1], "message");
-        assertEquals(columnNames[2], "postDate");
-        assertEquals(columnNames[3], "anotherDate");
-        assertEquals(columnNames[4], "user");
-        assertEquals(columnNames[5], "critical");
-        assertEquals(columnNames[6], "income");
-        assertEquals(columnNames[7], "untypedthingie");
-        
-        assertTrue(columnTypes.length == 8);
-        assertEquals(columnTypes[0], ColumnType.STRING);
-        assertEquals(columnTypes[1], ColumnType.BIGINT);
-        assertEquals(columnTypes[2], ColumnType.DATE);
-        assertEquals(columnTypes[3], ColumnType.DATE);
-        assertEquals(columnTypes[4], ColumnType.STRING);
-        assertEquals(columnTypes[5], ColumnType.BOOLEAN);
-        assertEquals(columnTypes[6], ColumnType.DOUBLE);
-        assertEquals(columnTypes[7], ColumnType.STRING);
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
deleted file mode 100644
index 4c8cca1..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtilsTest.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.Lists;
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.data.SimpleDataSetHeader;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.MutableColumn;
-import org.junit.Test;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-public class JestElasticSearchUtilsTest {
-
-    @Test
-    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
-        MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
-        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
-        List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
-        String documentId = "doc1";
-        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
-        JsonObject values = new JsonObject();
-
-        values.addProperty("value1", "theValue");
-        Row row = JestElasticSearchUtils.createRow(values, documentId, header);
-        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
-
-        assertEquals(primaryKeyValue, documentId);
-    }
-
-    @Test
-    public void testCreateRowWithNullValues() throws Exception {
-        final Column col1 = new MutableColumn("col1", ColumnType.STRING);
-        final Column col2 = new MutableColumn("col2", ColumnType.STRING);
-        final DataSetHeader header = new SimpleDataSetHeader(Lists.newArrayList(col1, col2).stream().map(SelectItem::new).collect(Collectors.toList()));
-        final JsonObject source = new JsonObject();
-        source.addProperty("col1", "foo");
-        source.addProperty("col2", (String) null);
-        final String documentId = "row1";
-
-        final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-        assertEquals("Row[values=[foo, null]]", row.toString());
-    }
-
-    @Test
-    public void testCreateRowWithNumberValueAndStringType() throws Exception {
-        final Column col1 = new MutableColumn("col1", ColumnType.STRING);
-        final DataSetHeader header =  SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
-        final JsonObject source = new JsonObject();
-        source.addProperty("col1", 42);
-        final String documentId = "row1";
-
-        final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-        assertEquals("Row[values=[42]]", row.toString());
-    }
-
-    @Test
-    public void testCreateRowWithStringValueAndNumberType() throws Exception {
-        final Column col1 = new MutableColumn("col1", ColumnType.NUMBER);
-        final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
-        final JsonObject source = new JsonObject();
-        source.addProperty("col1", "hello world");
-        final String documentId = "row1";
-
-        final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-
-        // whether or not 'null' should be returned (bad value, but preserves
-        // type) or 'hello world' should be returned (correct value, breaks
-        // type) can be debated. For now it is added here as an assertion to
-        // keep track of any regressions.
-        assertEquals("Row[values=[null]]", row.toString());
-    }
-
-    @Test
-    public void testCreateRowWithJsonObject() throws Exception {
-        final Column col1 = new MutableColumn("col1", ColumnType.MAP);
-        final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
-        final JsonObject source = new JsonObject();
-        final JsonObject value = new JsonObject();
-        value.addProperty("foo1", "bar");
-        value.addProperty("foo2", 42);
-        source.add("col1", value);
-        final String documentId = "row1";
-
-        final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-        assertEquals("Row[values=[{foo1=bar, foo2=42.0}]]", row.toString());
-
-        final Map<?, ?> rowValue = (Map<?, ?>) row.getValue(col1);
-        assertEquals("bar", rowValue.get("foo1"));
-    }
-
-    @Test
-    public void testCreateRowWithJsonArray() throws Exception {
-        final Column col1 = new MutableColumn("col1", ColumnType.LIST);
-        final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
-        final JsonObject source = new JsonObject();
-        final JsonArray value = new JsonArray();
-        value.add(new JsonPrimitive("foo"));
-        value.add(new JsonPrimitive("bar"));
-        source.add("col1", value);
-        final String documentId = "row1";
-
-        final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-        assertEquals("Row[values=[[foo, bar]]]", row.toString());
-
-        final List<?> rowValue = (List<?>) row.getValue(col1);
-        assertEquals("foo", rowValue.get(0));
-    }
-
-    @Test
-    public void testCreateRowWithDeepNesting() throws Exception {
-        final Column col1 = new MutableColumn("col1", ColumnType.LIST);
-        final DataSetHeader header = SimpleDataSetHeader.fromColumns(Lists.newArrayList(col1));
-        final JsonObject source = new JsonObject();
-
-        final JsonObject obj2 = new JsonObject();
-        obj2.addProperty("foo", 43);
-
-        final JsonArray arr1 = new JsonArray();
-        arr1.add(new JsonPrimitive("foo"));
-        arr1.add(new JsonPrimitive("bar"));
-        arr1.add(obj2);
-
-        final JsonObject obj1 = new JsonObject();
-        obj1.addProperty("mybool", true);
-        obj1.add("arr1", arr1);
-        source.add("col1", obj1);
-        final String documentId = "row1";
-
-        final Row row = JestElasticSearchUtils.createRow(source, documentId, header);
-        assertEquals("Row[values=[{mybool=true, arr1=[foo, bar, {foo=43.0}]}]]", row.toString());
-
-        final Map<?, ?> rowObj1 = (Map<?, ?>) row.getValue(col1);
-        final List<?> rowList = (List<?>) rowObj1.get("arr1");
-        final Map<?, ?> rowObj2 = (Map<?, ?>) rowList.get(2);
-        assertEquals(43.0, rowObj2.get("foo"));
-    }
-
-    @Test
-    public void testCreateRowWithParseableDates() throws Exception {
-        SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
-        SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
-        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
-        String documentId = "doc1";
-        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
-        JsonObject values = new JsonObject();
-        values.addProperty("value1", "theValue");
-        values.addProperty("value2", "2013-01-04T15:55:51.217+01:00");
-        Row row = JestElasticSearchUtils.createRow(values, documentId, header);
-        Object stringValue = row.getValue(item1);
-        Object dateValue = row.getValue(item2);
-
-        assertTrue(stringValue instanceof String);
-        assertTrue(dateValue instanceof Date);
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
deleted file mode 100644
index 11e7eb5..0000000
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/utils/EmbeddedElasticsearchServer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.node.Node;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class EmbeddedElasticsearchServer {
-
-    private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
-
-    private final Node node;
-    private final String dataDirectory;
-
-    public EmbeddedElasticsearchServer() {
-        this(DEFAULT_DATA_DIRECTORY);
-    }
-
-    public EmbeddedElasticsearchServer(String dataDirectory) {
-        this.dataDirectory = dataDirectory;
-
-        ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
-                .put("http.enabled", "true")
-                .put("path.data", dataDirectory)
-                .put("http.port", 9292);
-
-        node = nodeBuilder()
-                .local(true)
-                .settings(elasticsearchSettings.build())
-                .node();
-    }
-
-    public Client getClient() {
-        return node.client();
-    }
-
-    public void shutdown() {
-        node.close();
-        deleteDataDirectory();
-    }
-
-    private void deleteDataDirectory() {
-        try {
-            FileUtils.deleteDirectory(new File(dataDirectory));
-        } catch (IOException e) {
-            throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/resources/Dockerfile
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/resources/Dockerfile b/elasticsearch/rest/src/test/resources/Dockerfile
new file mode 100644
index 0000000..6c10f8e
--- /dev/null
+++ b/elasticsearch/rest/src/test/resources/Dockerfile
@@ -0,0 +1,5 @@
+FROM docker.elastic.co/elasticsearch/elasticsearch:5.6.3
+ADD elasticsearch.yml /usr/share/elasticsearch/config/
+USER root
+RUN chown elasticsearch:elasticsearch config/elasticsearch.yml
+USER elasticsearch
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/resources/elasticsearch.yml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/resources/elasticsearch.yml b/elasticsearch/rest/src/test/resources/elasticsearch.yml
new file mode 100644
index 0000000..ba7c07f
--- /dev/null
+++ b/elasticsearch/rest/src/test/resources/elasticsearch.yml
@@ -0,0 +1,13 @@
+bootstrap.memory_lock: true
+cluster.name: docker-cluster
+http.port: 9200
+node.data: true
+node.ingest: true
+node.master: true
+node.max_local_storage_nodes: 1
+node.name: estest
+path.data: /usr/share/elasticsearch/data
+path.logs: /usr/share/elasticsearch/logs
+transport.tcp.port: 9300
+discovery.type: single-node
+network.host: 0.0.0.0

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/full/src/main/java/org/apache/metamodel/DataContextFactory.java
----------------------------------------------------------------------
diff --git a/full/src/main/java/org/apache/metamodel/DataContextFactory.java b/full/src/main/java/org/apache/metamodel/DataContextFactory.java
index 427101e..eeb7df5 100644
--- a/full/src/main/java/org/apache/metamodel/DataContextFactory.java
+++ b/full/src/main/java/org/apache/metamodel/DataContextFactory.java
@@ -33,6 +33,7 @@ import org.apache.metamodel.couchdb.CouchDbDataContext;
 import org.apache.metamodel.csv.CsvConfiguration;
 import org.apache.metamodel.csv.CsvDataContext;
 import org.apache.metamodel.elasticsearch.nativeclient.ElasticSearchDataContext;
+import org.apache.metamodel.elasticsearch.rest.ElasticSearchRestClient;
 import org.apache.metamodel.elasticsearch.rest.ElasticSearchRestDataContext;
 import org.apache.metamodel.excel.ExcelConfiguration;
 import org.apache.metamodel.excel.ExcelDataContext;
@@ -64,8 +65,6 @@ import com.mongodb.MongoCredential;
 import com.mongodb.ServerAddress;
 import com.mongodb.client.MongoDatabase;
 
-import io.searchbox.client.JestClient;
-
 /**
  * A factory for DataContext objects. This class substantially easens the task
  * of creating and initializing DataContext objects and/or their strategies for
@@ -681,7 +680,8 @@ public class DataContextFactory {
      *       The ElasticSearch index name
      * @return a DataContext object that matches the request
      */
-    public static UpdateableDataContext createElasticSearchDataContext(JestClient client, String indexName) {
+    public static UpdateableDataContext createElasticSearchDataContext(final ElasticSearchRestClient client,
+            final String indexName) {
         return new ElasticSearchRestDataContext(client, indexName);
     }
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/hbase/pom.xml
----------------------------------------------------------------------
diff --git a/hbase/pom.xml b/hbase/pom.xml
index 172e2f6..6659dad 100644
--- a/hbase/pom.xml
+++ b/hbase/pom.xml
@@ -111,6 +111,10 @@
 					<artifactId>netty</artifactId>
 				</exclusion>
 				<exclusion>
+					<groupId>io.netty</groupId>
+					<artifactId>netty-all</artifactId>
+				</exclusion>
+				<exclusion>
 					<groupId>commons-httpclient</groupId>
 					<artifactId>commons-httpclient</artifactId>
 				</exclusion>

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4caae5b..3df13d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,6 +32,7 @@ under the License.
 		<spring.version>4.2.6.RELEASE</spring.version>
 		<httpcomponents.version>4.4.1</httpcomponents.version>
 		<checksum-maven-plugin.version>1.2</checksum-maven-plugin.version>
+		<docker-maven-plugin.version>0.23.0</docker-maven-plugin.version>
 		<skipTests>false</skipTests>
 	</properties>
 	<parent>


[2/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
deleted file mode 100644
index 3e71c4d..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchCreateTableBuilder.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.Map;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.create.AbstractTableCreationBuilder;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-
-import io.searchbox.indices.mapping.PutMapping;
-
-final class JestElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder<JestElasticSearchUpdateCallback> {
-    
-    public JestElasticSearchCreateTableBuilder(JestElasticSearchUpdateCallback updateCallback, Schema schema,
-            String name) {
-        super(updateCallback, schema, name);
-    }
-
-    @Override
-    public Table execute() throws MetaModelException {
-        final MutableTable table = getTable();
-        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
-
-        final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
-        final String indexName = dataContext.getIndexName();
-
-        final PutMapping putMapping = new PutMapping.Builder(indexName, table.getName(), source).build();
-        getUpdateCallback().execute(putMapping);
-
-        final MutableSchema schema = (MutableSchema) getSchema();
-        schema.addTable(table);
-        return table;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
deleted file mode 100644
index 37e06dc..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDataSet.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.metamodel.data.AbstractDataSet;
-import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.query.SelectItem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.SearchScroll;
-
-/**
- * {@link DataSet} implementation for ElasticSearch
- */
-final class JestElasticSearchDataSet extends AbstractDataSet {
-
-    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDataSet.class);
-
-    private final JestClient _client;
-    private final AtomicBoolean _closed;
-
-    private JestResult _searchResponse;
-    private JsonObject _currentHit;
-    private int _hitIndex = 0;
-
-    public JestElasticSearchDataSet(JestClient client, JestResult searchResponse, List<SelectItem> selectItems) {
-        super(selectItems);
-        _client = client;
-        _searchResponse = searchResponse;
-        _closed = new AtomicBoolean(false);
-    }
-
-
-    @Override
-    public void close() {
-        super.close();
-        boolean closeNow = _closed.compareAndSet(true, false);
-        if (closeNow) {
-            final String scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id").getAsString();
-            JestClientExecutor.execute(_client, new JestDeleteScroll.Builder(scrollId).build(), false);
-        }
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        super.finalize();
-        if (!_closed.get()) {
-            logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
-            close();
-        }
-    }
-
-    @Override
-    public boolean next() {
-        final JsonArray hits = _searchResponse.getJsonObject().getAsJsonObject("hits").getAsJsonArray("hits");
-        if (hits.size() == 0) {
-            // break condition for the scroll
-            _currentHit = null;
-            return false;
-        }
-
-        if (_hitIndex < hits.size()) {
-            // pick the next hit within this search response
-            _currentHit = hits.get(_hitIndex).getAsJsonObject();
-            _hitIndex++;
-            return true;
-        }
-
-        final JsonPrimitive scrollId = _searchResponse.getJsonObject().getAsJsonPrimitive("_scroll_id");
-        if (scrollId == null) {
-            // this search response is not scrollable - then it's the end.
-            _currentHit = null;
-            return false;
-        }
-
-        // try to scroll to the next set of hits
-        final SearchScroll scroll = new SearchScroll.Builder(scrollId.getAsString(), ElasticSearchRestDataContext.TIMEOUT_SCROLL).build();
-
-        _searchResponse = JestClientExecutor.execute(_client, scroll);
-
-        // start over (recursively)
-        _hitIndex = 0;
-        return next();
-    }
-
-    @Override
-    public Row getRow() {
-        if (_currentHit == null) {
-            return null;
-        }
-
-        final JsonObject source = _currentHit.getAsJsonObject("_source");
-        final String documentId = _currentHit.get("_id").getAsString();
-        return JestElasticSearchUtils.createRow(source, documentId, getHeader());
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
deleted file mode 100644
index cc1c3e7..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDeleteBuilder.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.core.DeleteByQuery;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.query.FilterItem;
-import org.apache.metamodel.query.LogicalOperator;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-
-import java.util.List;
-
-/**
- * {@link RowDeletionBuilder} implementation for
- * {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
-    private final JestElasticSearchUpdateCallback _updateCallback;
-
-    public JestElasticSearchDeleteBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final Table table = getTable();
-        final String documentType = table.getName();
-
-        final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext();
-        final String indexName = dataContext.getIndexName();
-
-        final List<FilterItem> whereItems = getWhereItems();
-
-        // delete by query - note that creteQueryBuilderForSimpleWhere may
-        // return matchAllQuery() if no where items are present.
-        final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
-                LogicalOperator.AND);
-        if (queryBuilder == null) {
-            // TODO: The where items could not be pushed down to a query. We
-            // could solve this by running a query first, gather all
-            // document IDs and then delete by IDs.
-            throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
-                    + whereItems);
-        }
-        final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-        searchSourceBuilder.query(queryBuilder);
-
-        final DeleteByQuery deleteByQuery =
-                new DeleteByQuery.Builder(searchSourceBuilder.toString()).addIndex(indexName).addType(
-                        documentType).build();
-
-        _updateCallback.execute(deleteByQuery);
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
deleted file mode 100644
index 8a1ac71..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.searchbox.indices.mapping.DeleteMapping;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class JestElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchDropTableBuilder.class);
-
-    private final JestElasticSearchUpdateCallback _updateCallback;
-
-    public JestElasticSearchDropTableBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-
-        final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext();
-        final Table table = getTable();
-        final String documentType = table.getName();
-        logger.info("Deleting mapping / document type: {}", documentType);
-
-        final DeleteMapping deleteIndex = new DeleteMapping.Builder(dataContext.getIndexName(), documentType).build();
-
-        _updateCallback.execute(deleteIndex);
-
-        final MutableSchema schema = (MutableSchema) table.getSchema();
-        schema.removeTable(table);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
deleted file mode 100644
index 746538d..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchInsertBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.Table;
-
-import io.searchbox.core.Index;
-import io.searchbox.params.Parameters;
-
-final class JestElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<JestElasticSearchUpdateCallback> {
-
-    public JestElasticSearchInsertBuilder(JestElasticSearchUpdateCallback updateCallback, Table table) {
-        super(updateCallback, table);
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final JestElasticSearchUpdateCallback updateCallback = getUpdateCallback();
-        final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
-        final String indexName = dataContext.getIndexName();
-        final String documentType = getTable().getName();
-
-        final Map<String, Object> source = new HashMap<>();
-        final Column[] columns = getColumns();
-        final Object[] values = getValues();
-        String id = null;
-        for (int i = 0; i < columns.length; i++) {
-            if (isSet(columns[i])) {
-                final String columnName = columns[i].getName();
-
-                final Object value = values[i];
-                if (ElasticSearchRestDataContext.FIELD_ID.equals(columnName)) {
-                    if (value != null) {
-                        id = value.toString();
-                    }
-                } else {
-                    final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
-                    source.put(fieldName, value);
-                }
-            }
-        }
-
-        assert !source.isEmpty();
-
-        final Index index = new Index.Builder(source).index(indexName).type(documentType).id(id).setParameter(
-                Parameters.OP_TYPE, "create").build();
-
-        getUpdateCallback().execute(index);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
deleted file mode 100644
index 074de2e..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchMetaDataParser.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.ColumnType;
-
-import java.util.Map.Entry;
-
-/**
- * Parser that transforms the ElasticSearch metadata response (json-like format)
- * into an ElasticSearchMetaData object.
- */
-final class JestElasticSearchMetaDataParser {
-
-    /**
-     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
-     * object. This method makes much easier to create the ElasticSearch schema.
-     *
-     * @param metaDataInfo
-     *            ElasticSearch mapping metadata in Map format
-     * @return An ElasticSearchMetaData object
-     */
-    public static ElasticSearchMetaData parse(JsonObject metaDataInfo) {
-        final int columns = metaDataInfo.entrySet().size() + 1;
-        final String[] fieldNames = new String[columns];
-        final ColumnType[] columnTypes = new ColumnType[columns];
-
-        // add the document ID field (fixed)
-        fieldNames[0] = ElasticSearchRestDataContext.FIELD_ID;
-        columnTypes[0] = ColumnType.STRING;
-
-        int i = 1;
-        for (Entry<String, JsonElement> metaDataField : metaDataInfo.entrySet()) {
-            JsonElement fieldMetadata = metaDataField.getValue();
-
-            fieldNames[i] = metaDataField.getKey();
-            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
-            i++;
-
-        }
-        return new ElasticSearchMetaData(fieldNames, columnTypes);
-    }
-
-    private static ColumnType getColumnTypeFromMetadataField(JsonElement fieldMetadata) {
-        final JsonElement typeElement = ((JsonObject) fieldMetadata).get("type");
-        if (typeElement != null) {
-            String metaDataFieldType = typeElement.getAsString();
-
-            return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
-        } else {
-            return ColumnType.STRING;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
deleted file mode 100644
index a61280a..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUpdateCallback.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.List;
-
-import org.apache.metamodel.AbstractUpdateCallback;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.UpdateCallback;
-import org.apache.metamodel.create.TableCreationBuilder;
-import org.apache.metamodel.delete.RowDeletionBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.insert.RowInsertionBuilder;
-import org.apache.metamodel.schema.Schema;
-import org.apache.metamodel.schema.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.searchbox.action.Action;
-import io.searchbox.action.BulkableAction;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Bulk;
-import io.searchbox.core.Bulk.Builder;
-import io.searchbox.core.BulkResult;
-import io.searchbox.core.BulkResult.BulkResultItem;
-import io.searchbox.indices.Refresh;
-
-/**
- * {@link UpdateCallback} implementation for
- * {@link ElasticSearchRestDataContext}.
- */
-final class JestElasticSearchUpdateCallback extends AbstractUpdateCallback {
-
-    private static final Logger logger = LoggerFactory.getLogger(JestElasticSearchUpdateCallback.class);
-
-    private static final int BULK_BUFFER_SIZE = 1000;
-
-    private Bulk.Builder bulkBuilder;
-    private int bulkActionCount = 0;
-    private final boolean isBatch;
-
-    public JestElasticSearchUpdateCallback(ElasticSearchRestDataContext dataContext, boolean isBatch) {
-        super(dataContext);
-        this.isBatch = isBatch;
-    }
-
-    private boolean isBatch() {
-        return isBatch;
-    }
-
-    @Override
-    public ElasticSearchRestDataContext getDataContext() {
-        return (ElasticSearchRestDataContext) super.getDataContext();
-    }
-
-    @Override
-    public TableCreationBuilder createTable(Schema schema, String name) throws IllegalArgumentException,
-            IllegalStateException {
-        return new JestElasticSearchCreateTableBuilder(this, schema, name);
-    }
-
-    @Override
-    public boolean isDropTableSupported() {
-        return true;
-    }
-
-    @Override
-    public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDropTableBuilder(this, table);
-    }
-
-    @Override
-    public RowInsertionBuilder insertInto(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchInsertBuilder(this, table);
-    }
-
-    @Override
-    public boolean isDeleteSupported() {
-        return true;
-    }
-
-    @Override
-    public RowDeletionBuilder deleteFrom(Table table) throws IllegalArgumentException, IllegalStateException,
-            UnsupportedOperationException {
-        return new JestElasticSearchDeleteBuilder(this, table);
-    }
-
-    public void onExecuteUpdateFinished() {
-        if (isBatch()) {
-            flushBulkActions();
-        }
-
-        final String indexName = getDataContext().getIndexName();
-        final Refresh refresh = new Refresh.Builder().addIndex(indexName).build();
-
-        JestClientExecutor.execute(getDataContext().getElasticSearchClient(), refresh, false);
-    }
-
-    private void flushBulkActions() {
-        if (bulkBuilder == null || bulkActionCount == 0) {
-            // nothing to flush
-            return;
-        }
-        final Bulk bulk = getBulkBuilder().build();
-        logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
-        executeBlocking(bulk);
-
-        bulkActionCount = 0;
-        bulkBuilder = null;
-    }
-
-    public void execute(Action<?> action) {
-        if (isBatch() && action instanceof BulkableAction) {
-            final Bulk.Builder bulkBuilder = getBulkBuilder();
-            bulkBuilder.addAction((BulkableAction<?>) action);
-            bulkActionCount++;
-            if (bulkActionCount == BULK_BUFFER_SIZE) {
-                flushBulkActions();
-            }
-        } else {
-            executeBlocking(action);
-        }
-    }
-
-    private void executeBlocking(Action<?> action) {
-        final JestResult result = JestClientExecutor.execute(getDataContext().getElasticSearchClient(), action);
-        if (!result.isSucceeded()) {
-            if (result instanceof BulkResult) {
-                final List<BulkResultItem> failedItems = ((BulkResult) result).getFailedItems();
-                for (int i = 0; i < failedItems.size(); i++) {
-                    final BulkResultItem failedItem = failedItems.get(i);
-                    logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i+1, failedItems.size(), failedItem.id, failedItem.operation, failedItem.status, failedItem.error);
-                }
-            }
-            throw new MetaModelException(result.getResponseCode() + " - " + result.getErrorMessage());
-        }
-    }
-
-    private Builder getBulkBuilder() {
-        if (bulkBuilder == null) {
-            bulkBuilder = new Bulk.Builder();
-            bulkBuilder.defaultIndex(getDataContext().getIndexName());
-        }
-        return bulkBuilder;
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
deleted file mode 100644
index 11a79b7..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestElasticSearchUtils.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.util.NumberComparator;
-
-import com.google.gson.Gson;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class JestElasticSearchUtils {
-    public static Row createRow(JsonObject source, String documentId, DataSetHeader header) {
-        final Object[] values = new Object[header.size()];
-        for (int i = 0; i < values.length; i++) {
-            final SelectItem selectItem = header.getSelectItem(i);
-            final Column column = selectItem.getColumn();
-
-            assert column != null;
-            assert !selectItem.hasFunction();
-
-            if (column.isPrimaryKey()) {
-                values[i] = documentId;
-            } else {
-                values[i] = getDataFromColumnType(source.get(column.getName()), column.getType());
-            }
-        }
-
-        return new DefaultRow(header, values);
-    }
-
-    private static Object getDataFromColumnType(JsonElement field, ColumnType type) {
-        if (field == null || field.isJsonNull()) {
-            return null;
-        }
-
-        if (field.isJsonObject()) {
-            return new Gson().fromJson(field, Map.class);
-        }
-        if (field.isJsonArray()) {
-            return new Gson().fromJson(field, List.class);
-        }
-
-        if (type.isNumber()) {
-            // Pretty terrible workaround to avoid LazilyParsedNumber
-            // (which is happily output, but not recognized by Jest/GSON).
-            return NumberComparator.toNumber(field.getAsString());
-        } else if (type.isTimeBased()) {
-            final Date valueToDate = ElasticSearchDateConverter.tryToConvert(field.getAsString());
-            if (valueToDate == null) {
-                return field.getAsString();
-            } else {
-                return valueToDate;
-            }
-        } else if (type.isBoolean()) {
-            return field.getAsBoolean();
-        } else {
-            return field.getAsString();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
new file mode 100644
index 0000000..4705585
--- /dev/null
+++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContexFactoryIT.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.HttpHost;
+import org.apache.metamodel.BatchUpdateScript;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.factory.DataContextFactory;
+import org.apache.metamodel.factory.DataContextPropertiesImpl;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticSearchRestDataContexFactoryIT {
+    private static final String INDEX_NAME = "myindex";
+
+    private static ElasticSearchRestClient externalClient;
+
+    private String dockerHostAddress;
+
+    private DataContextFactory factory;
+
+    @Before
+    public void setUp() throws Exception {
+        dockerHostAddress = ElasticSearchRestDataContextIT.determineHostName();
+
+        externalClient = new ElasticSearchRestClient(RestClient.builder(new HttpHost(dockerHostAddress, 9200)).build());
+
+        final Map<String, Object> source = new LinkedHashMap<>();
+        source.put("mytext", "dummy");
+
+        final IndexRequest indexRequest = new IndexRequest(INDEX_NAME, "text");
+        indexRequest.source(source);
+
+        externalClient.index(indexRequest);
+
+        factory = new ElasticSearchRestDataContextFactory();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        externalClient.delete(INDEX_NAME);
+    }
+
+    @Test
+    public void testAccepts() throws Exception {
+        final DataContextPropertiesImpl properties = new DataContextPropertiesImpl();
+        properties.setDataContextType("elasticsearch");
+        properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://" + dockerHostAddress + ":9200");
+        properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, INDEX_NAME);
+
+        assertTrue(factory.accepts(properties, null));
+    }
+
+    @Test
+    public void testCreateContextAndBulkScript() throws Exception {
+        final DataContextPropertiesImpl properties = new DataContextPropertiesImpl();
+        properties.setDataContextType("es-rest");
+        properties.put(DataContextPropertiesImpl.PROPERTY_URL, "http://" + dockerHostAddress + ":9200");
+        properties.put(DataContextPropertiesImpl.PROPERTY_DATABASE, INDEX_NAME);
+
+        assertTrue(factory.accepts(properties, null));
+
+        final ElasticSearchRestDataContext dataContext = (ElasticSearchRestDataContext) factory.create(properties,
+                null);
+
+        dataContext.executeUpdate(new BatchUpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.createTable(INDEX_NAME, "persons")
+                        .withColumn("name").ofType(ColumnType.STRING)
+                        .withColumn("age").ofType(ColumnType.INTEGER)
+                        .execute();
+            }
+        });
+
+        dataContext.executeUpdate(new BatchUpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto("persons").value("name", "John Doe").value("age", 42).execute();
+                callback.insertInto("persons").value("name", "Jane Doe").value("age", 41).execute();
+            }
+        });
+
+        dataContext.refreshSchemas();
+
+        final DataSet persons = dataContext.executeQuery("SELECT name, age FROM persons");
+        final List<Row> personData = persons.toRows();
+
+        assertEquals(2, personData.size());
+
+        // Sort person data, so we can validate each row's values.
+        Column ageColumn = dataContext.getSchemaByName(INDEX_NAME).getTableByName("persons").getColumnByName("age");
+        personData.sort((row1, row2) -> ((Integer) row1.getValue(ageColumn)).compareTo(((Integer) row2.getValue(
+                ageColumn))));
+
+        assertThat(Arrays.asList(personData.get(0).getValues()), containsInAnyOrder("Jane Doe", 41));
+        assertThat(Arrays.asList(personData.get(1).getValues()), containsInAnyOrder("John Doe", 42));
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
new file mode 100644
index 0000000..7d5eb12
--- /dev/null
+++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextIT.java
@@ -0,0 +1,535 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.swing.table.TableModel;
+
+import org.apache.http.HttpHost;
+import org.apache.metamodel.MetaModelHelper;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.UpdateScript;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.create.CreateTable;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetTableModel;
+import org.apache.metamodel.data.InMemoryDataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.delete.DeleteFrom;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FunctionType;
+import org.apache.metamodel.query.Query;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.query.parser.QueryParserException;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.Update;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticSearchRestDataContextIT {
+    private static final String DEFAULT_DOCKER_HOST_NAME = "localhost";
+
+    private static final String indexName = "twitter";
+    private static final String indexType1 = "tweet1";
+    private static final String indexType2 = "tweet2";
+    private static final String bulkIndexType = "bulktype";
+    private static final String peopleIndexType = "peopletype";
+
+    private static ElasticSearchRestClient client;
+
+    private static UpdateableDataContext dataContext;
+    
+    public static String determineHostName() throws URISyntaxException {
+        final String dockerHost = System.getenv("DOCKER_HOST");
+
+        if (dockerHost == null) {
+            // If no value is returned for the DOCKER_HOST environment variable fall back to a default.
+            return DEFAULT_DOCKER_HOST_NAME;
+        } else {
+            return (new URI(dockerHost)).getHost();
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        final String dockerHostAddress = determineHostName();
+        
+        client = new ElasticSearchRestClient(RestClient.builder(new HttpHost(dockerHostAddress, 9200)).build()); 
+
+        indexTweeterDocument(indexType1, 1);
+        indexTweeterDocument(indexType2, 1);
+        indexTweeterDocument(indexType2, 2, null);
+        insertPeopleDocuments();
+        indexTweeterDocument(indexType2, 1);
+        indexBulkDocuments(indexName, bulkIndexType, 10);
+        
+        client.refresh(indexName);
+
+        dataContext = new ElasticSearchRestDataContext(client, indexName);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        client.delete(indexName);
+    }
+
+    private static void insertPeopleDocuments() throws IOException {
+        indexOnePeopleDocument("female", 20, 5);
+        indexOnePeopleDocument("female", 17, 8);
+        indexOnePeopleDocument("female", 18, 9);
+        indexOnePeopleDocument("female", 19, 10);
+        indexOnePeopleDocument("female", 20, 11);
+        indexOnePeopleDocument("male", 19, 1);
+        indexOnePeopleDocument("male", 17, 2);
+        indexOnePeopleDocument("male", 18, 3);
+        indexOnePeopleDocument("male", 18, 4);
+    }
+
+    @Test
+    public void testSimpleQuery() throws Exception {
+        assertEquals("[bulktype, peopletype, tweet1, tweet2]",
+                Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
+
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+
+        assertThat(table.getColumnNames(), containsInAnyOrder("_id", "message", "postDate", "user"));
+
+        assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
+        assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
+        assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
+
+        try (DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testDocumentIdAsPrimaryKey() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys().toArray(new Column[0]);
+        assertEquals(1, pks.length);
+        assertEquals("_id", pks[0].getName());
+
+        try (DataSet ds = dataContext.query().from(table).select("user", "_id").orderBy("_id").asc().execute()) {
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user1, tweet_tweet2_1]]", ds.getRow().toString());
+        }
+    }
+
+    @Test
+    public void testExecutePrimaryKeyLookupQuery() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet2");
+        Column[] pks = table.getPrimaryKeys().toArray(new Column[0]);
+
+        try (DataSet ds = dataContext.query().from(table).selectAll().where(pks[0]).eq("tweet_tweet2_1").execute()) {
+            assertTrue(ds.next());
+            Object dateValue = ds.getRow().getValue(1);
+            assertEquals("Row[values=[tweet_tweet2_1, " + dateValue + ", 1, user1]]", ds.getRow().toString());
+
+            assertFalse(ds.next());
+
+            assertEquals(InMemoryDataSet.class, ds.getClass());
+        }
+    }
+
+    @Test
+    public void testDateIsHandledAsDate() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+        Column column = table.getColumnByName("postDate");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.DATE, type);
+
+        DataSet dataSet = dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Date (or subclass)", value instanceof Date);
+        }
+    }
+
+    @Test
+    public void testNumberIsHandledAsNumber() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Column column = table.getColumnByName("age");
+        ColumnType type = column.getType();
+        assertEquals(ColumnType.BIGINT, type);
+
+        DataSet dataSet = dataContext.query().from(table).select(column).execute();
+        while (dataSet.next()) {
+            Object value = dataSet.getRow().getValue(column);
+            assertTrue("Got class: " + value.getClass() + ", expected Number (or subclass)", value instanceof Number);
+        }
+    }
+
+    @Test
+    public void testCreateTableAndInsertQuery() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+        assertNotNull(table);
+        assertEquals("[" + ElasticSearchUtils.FIELD_ID + ", foo, bar]", Arrays.toString(table.getColumnNames().toArray()));
+
+        final Column fooColumn = table.getColumnByName("foo");
+        final Column idColumn = table.getPrimaryKeys().get(0);
+        assertEquals("Column[name=_id,columnNumber=0,type=STRING,nullable=null,nativeType=null,columnSize=null]",
+                idColumn.toString());
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
+
+        dataContext.refreshSchemas();
+
+
+        try (DataSet ds = dataContext.query().from(table).selectAll().orderBy("bar").execute()) {
+            assertTrue(ds.next());
+            assertEquals("hello", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertTrue(ds.next());
+            assertEquals("world", ds.getRow().getValue(fooColumn).toString());
+            assertNotNull(ds.getRow().getValue(idColumn));
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testDeleteAll() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new DeleteFrom(table));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
+                .toQuery());
+        assertEquals("Count is wrong", 0, ((Number) row.getValue(0)).intValue());
+    }
+
+    @Test
+    public void testDeleteByQuery() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new DeleteFrom(table).where("foo").eq("hello").where("bar").eq(42));
+
+        Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
+                dataContext.query().from(table).select("foo", "bar").toQuery());
+        assertEquals("Row[values=[world, 43]]", row.toString());
+    }
+
+    @Test
+    public void testDeleteUnsupportedQueryType() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
+
+        // greater than is not supported
+        try {
+            dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
+            fail("Exception expected");
+        } catch (UnsupportedOperationException e) {
+            assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]", e
+                    .getMessage());
+        }
+    }
+
+    @Test
+    public void testUpdateRow() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final CreateTable createTable = new CreateTable(schema, "testCreateTable");
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        dataContext.executeUpdate(createTable);
+
+        final Table table = schema.getTableByName("testCreateTable");
+
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
+
+        dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
+        
+        DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+        assertFalse(dataSet.next());
+        dataSet.close();
+    }
+
+    @Test
+    public void testWhereColumnEqualsValues() throws Exception {
+        try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNullValues() throws Exception {
+        try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNull().execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[2]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnIsNotNullValues() throws Exception {
+        try (DataSet ds = dataContext.query().from(indexType2).select("message").where("postDate")
+                .isNotNull().execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[1]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereMultiColumnsEqualValues() throws Exception {
+        try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .isEquals("user4").and("message").ne(5).execute()) {
+            assertEquals(ElasticSearchRestDataSet.class, ds.getClass());
+
+            assertTrue(ds.next());
+            assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testWhereColumnInValues() throws Exception {
+        try (DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+                .in("user4", "user5").orderBy("message").execute()) {
+            assertTrue(ds.next());
+
+            String row1 = ds.getRow().toString();
+            assertEquals("Row[values=[user4, 4]]", row1);
+            assertTrue(ds.next());
+
+            String row2 = ds.getRow().toString();
+            assertEquals("Row[values=[user5, 5]]", row2);
+
+            assertFalse(ds.next());
+        }
+    }
+
+    @Test
+    public void testGroupByQuery() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+        Query q = new Query();
+        q.from(table);
+        q.groupBy(table.getColumnByName("gender"));
+        q.select(new SelectItem(table.getColumnByName("gender")),
+                new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
+                new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
+                        "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId"));
+        q.orderBy("gender");
+        DataSet data = dataContext.executeQuery(q);
+        assertEquals(
+                "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
+                Arrays.toString(data.getSelectItems().toArray()));
+
+        assertTrue(data.next());
+        assertEquals("Row[values=[female, 20, 17, 5, 5]]", data.getRow().toString());
+        assertTrue(data.next());
+        assertEquals("Row[values=[male, 19, 17, 4, 1]]", data.getRow().toString());
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testFilterOnNumberColumn() {
+        Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery();
+        DataSet data = dataContext.executeQuery(q);
+        String[] expectations = new String[] { "Row[values=[user8]]", "Row[values=[user9]]" };
+
+        assertTrue(data.next());
+        assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertTrue(data.next());
+        assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+        assertFalse(data.next());
+    }
+
+    @Test
+    public void testMaxRows() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        Query query = new Query().from(table).select(table.getColumns()).setMaxRows(5);
+        DataSet dataSet = dataContext.executeQuery(query);
+
+        TableModel tableModel = new DataSetTableModel(dataSet);
+        assertEquals(5, tableModel.getRowCount());
+    }
+
+    @Test
+    public void testCountQuery() throws Exception {
+        Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+        Query q = new Query().selectCount().from(table);
+
+        List<Object[]> data = dataContext.executeQuery(q).toObjectArrays();
+        assertEquals(1, data.size());
+        Object[] row = data.get(0);
+        assertEquals(1, row.length);
+        assertEquals(10, ((Number) row[0]).intValue());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testQueryForANonExistingTable() throws Exception {
+        dataContext.query().from("nonExistingTable").select("user").and("message").execute();
+    }
+
+    @Test(expected = QueryParserException.class)
+    public void testQueryForAnExistingTableAndNonExistingField() throws Exception {
+        indexTweeterDocument(indexType1, 1);
+        dataContext.query().from(indexType1).select("nonExistingField").execute();
+    }
+
+    private static void indexBulkDocuments(final String indexName, final String indexType, final int numberOfDocuments)
+            throws IOException {
+        final BulkRequest bulkRequest = new BulkRequest();
+
+        for (int i = 0; i < numberOfDocuments; i++) {
+            final IndexRequest indexRequest = new IndexRequest(indexName, indexType, Integer.toString(i));
+            indexRequest.source(buildTweeterJson(i));
+            
+            bulkRequest.add(indexRequest);
+        }
+        
+        client.bulk(bulkRequest);
+    }
+
+    private static void indexTweeterDocument(final String indexType, final int id, final Date date) throws IOException {
+        final IndexRequest indexRequest = new IndexRequest(indexName, indexType, "tweet_" + indexType + "_" + id);
+        indexRequest.source(buildTweeterJson(id, date));
+        
+        client.index(indexRequest);
+    }
+
+    private static void indexTweeterDocument(String indexType, int id) throws IOException {
+        final IndexRequest indexRequest = new IndexRequest(indexName, indexType, "tweet_" + indexType + "_" + id);
+        indexRequest.source(buildTweeterJson(id));
+        
+        client.index(indexRequest);
+    }
+
+    private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
+        final IndexRequest indexRequest = new IndexRequest(indexName, peopleIndexType);
+        indexRequest.source(buildPeopleJson(gender, age, id));
+        
+        client.index(indexRequest);
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId) {
+        return buildTweeterJson(elementId, new Date());
+    }
+
+    private static Map<String, Object> buildTweeterJson(int elementId, Date date) {
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("user", "user" + elementId);
+        map.put("postDate", date);
+        map.put("message", elementId);
+        return map;
+    }
+
+    private static XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws IOException {
+        return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject();
+    }
+
+}


[3/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

Posted by ka...@apache.org.
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
index ec5ecba..983ba5e 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
@@ -19,12 +19,6 @@
 package org.apache.metamodel.elasticsearch.nativeclient;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -45,9 +39,7 @@ import org.apache.metamodel.data.DataSetTableModel;
 import org.apache.metamodel.data.InMemoryDataSet;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.delete.DeleteFrom;
-import org.apache.metamodel.drop.DropTable;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer;
 import org.apache.metamodel.query.FunctionType;
 import org.apache.metamodel.query.Query;
 import org.apache.metamodel.query.SelectItem;
@@ -58,20 +50,18 @@ import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.update.Update;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
 import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
-public class ElasticSearchDataContextTest {
+public class ElasticSearchDataContextTest extends ESSingleNodeTestCase {
 
     private static final String indexName = "twitter";
     private static final String indexType1 = "tweet1";
@@ -81,14 +71,15 @@ public class ElasticSearchDataContextTest {
     private static final String bulkIndexType = "bulktype";
     private static final String peopleIndexType = "peopletype";
     private static final String mapping = "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
-    private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
-    private static Client client;
-    private static UpdateableDataContext dataContext;
-
-    @BeforeClass
-    public static void beforeTests() throws Exception {
-        embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
-        client = embeddedElasticsearchServer.getClient();
+    private Client client;
+    private UpdateableDataContext dataContext;
+
+    @Before
+    public void beforeTests() throws Exception {
+        client = client();
+
+        dataContext = new ElasticSearchDataContext(client, indexName);
+
         indexTweeterDocument(indexType1, 1);
         indexTweeterDocument(indexType2, 1);
         indexTweeterDocument(indexType2, 2, null);
@@ -96,15 +87,10 @@ public class ElasticSearchDataContextTest {
         indexTweeterDocument(indexType2, 1);
         indexBulkDocuments(indexName, bulkIndexType, 10);
 
-        // The refresh API allows to explicitly refresh one or more index,
-        // making all operations performed since the last refresh available for
-        // search
-        embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
-        dataContext = new ElasticSearchDataContext(client, indexName);
-        System.out.println("Embedded ElasticSearch server created!");
+        dataContext.refreshSchemas();
     }
 
-    private static void insertPeopleDocuments() throws IOException {
+    private void insertPeopleDocuments() throws IOException {
         indexOnePeopleDocument("female", 20, 5);
         indexOnePeopleDocument("female", 17, 8);
         indexOnePeopleDocument("female", 18, 9);
@@ -116,10 +102,9 @@ public class ElasticSearchDataContextTest {
         indexOnePeopleDocument("male", 18, 4);
     }
 
-    @AfterClass
-    public static void afterTests() {
-        embeddedElasticsearchServer.shutdown();
-        System.out.println("Embedded ElasticSearch server shut down!");
+    @After
+    public void afterTests() {
+        client.admin().indices().delete(new DeleteIndexRequest("_all")).actionGet();
     }
 
     @Test
@@ -128,9 +113,15 @@ public class ElasticSearchDataContextTest {
                 Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
 
         Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+        try (DataSet ds = dataContext.query().from(indexType1).select("_id").execute()) {
+            assertEquals(ElasticSearchDataSet.class, ds.getClass());
+            assertTrue(ds.next());
+            assertEquals("Row[values=[tweet_tweet1_1]]",ds.getRow().toString());
+        }
 
         assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray()));
 
+        assertEquals(ColumnType.STRING, table.getColumnByName("_id").getType());
         assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
         assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
         assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
@@ -201,7 +192,7 @@ public class ElasticSearchDataContextTest {
     }
 
     @Test
-    public void testCreateTableInsertQueryAndDrop() throws Exception {
+    public void testCreateTableAndInsertQuery() throws Exception {
         final Schema schema = dataContext.getDefaultSchema();
         final CreateTable createTable = new CreateTable(schema, "testCreateTable");
         createTable.withColumn("foo").ofType(ColumnType.STRING);
@@ -235,42 +226,42 @@ public class ElasticSearchDataContextTest {
             assertNotNull(ds.getRow().getValue(idColumn));
             assertFalse(ds.next());
         }
-
-        dataContext.executeUpdate(new DropTable(table));
-
-        dataContext.refreshSchemas();
-
-        assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
     }
 
     @Test
-    public void testDetectOutsideChanges() throws Exception {
-        ElasticSearchDataContext elasticSearchDataContext = (ElasticSearchDataContext) dataContext;
+    public void testDeleteFromWithWhere() throws Exception {
+        final Schema schema = dataContext.getDefaultSchema();
+        final String tableName = "testCreateTableDelete";
+        final CreateTable createTable = new CreateTable(schema, tableName);
+        createTable.withColumn("foo").ofType(ColumnType.STRING);
+        createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
+        dataContext.executeUpdate(createTable);
 
-        // Create the type in ES
-        final IndicesAdminClient indicesAdmin = elasticSearchDataContext.getElasticSearchClient().admin().indices();
-        final String tableType = "outsideTable";
+        final Table table = schema.getTableByName(tableName);
 
-        Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" };
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
 
-        new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
-                .execute().actionGet();
+        dataContext.executeUpdate(new DeleteFrom(table).where("bar").eq(42));
 
-        dataContext.refreshSchemas();
+        final Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
+                .toQuery());
 
-        assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
+        assertEquals("Row[values=[1]]", row.toString());
 
-        new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
-        dataContext.refreshSchemas();
-        assertNull(dataContext.getTableByQualifiedLabel(tableType));
     }
 
     @Test
-    public void testDeleteAll() throws Exception {
+    public void testDeleteNoWhere() throws Exception {
         final Schema schema = dataContext.getDefaultSchema();
         final CreateTable createTable = new CreateTable(schema, "testCreateTable");
         createTable.withColumn("foo").ofType(ColumnType.STRING);
-        createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+        createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
         dataContext.executeUpdate(createTable);
 
         final Table table = schema.getTableByName("testCreateTable");
@@ -288,8 +279,6 @@ public class ElasticSearchDataContextTest {
         Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
                 .toQuery());
         assertEquals("Row[values=[0]]", row.toString());
-
-        dataContext.executeUpdate(new DropTable(table));
     }
 
     @Test
@@ -315,8 +304,6 @@ public class ElasticSearchDataContextTest {
         Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
                 dataContext.query().from(table).select("foo", "bar").toQuery());
         assertEquals("Row[values=[world, 43]]", row.toString());
-
-        dataContext.executeUpdate(new DropTable(table));
     }
 
     @Test
@@ -328,27 +315,22 @@ public class ElasticSearchDataContextTest {
         dataContext.executeUpdate(createTable);
 
         final Table table = schema.getTableByName("testCreateTable");
-        try {
 
-            dataContext.executeUpdate(new UpdateScript() {
-                @Override
-                public void run(UpdateCallback callback) {
-                    callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                    callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-                }
-            });
-
-            // greater than is not yet supported
-            try {
-                dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
-                fail("Exception expected");
-            } catch (UnsupportedOperationException e) {
-                assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
-                        e.getMessage());
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
             }
+        });
 
-        } finally {
-            dataContext.executeUpdate(new DropTable(table));
+        // greater than is not yet supported
+        try {
+            dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
+            fail("Exception expected");
+        } catch (UnsupportedOperationException e) {
+            assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
+                    e.getMessage());
         }
     }
 
@@ -361,54 +343,24 @@ public class ElasticSearchDataContextTest {
         dataContext.executeUpdate(createTable);
 
         final Table table = schema.getTableByName("testCreateTable");
-        try {
 
-            dataContext.executeUpdate(new UpdateScript() {
-                @Override
-                public void run(UpdateCallback callback) {
-                    callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
-                    callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
-                }
-            });
-
-            dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
-
-            DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
-            assertTrue(dataSet.next());
-            assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
-            assertTrue(dataSet.next());
-            assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
-            assertFalse(dataSet.next());
-            dataSet.close();
-        } finally {
-            dataContext.executeUpdate(new DropTable(table));
-        }
-    }
-
-    @Test
-    public void testDropTable() throws Exception {
-        Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+        dataContext.executeUpdate(new UpdateScript() {
+            @Override
+            public void run(UpdateCallback callback) {
+                callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+                callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+            }
+        });
 
-        // assert that the table was there to begin with
-        {
-            DataSet ds = dataContext.query().from(table).selectCount().execute();
-            ds.next();
-            assertEquals("Row[values=[9]]", ds.getRow().toString());
-            ds.close();
-        }
+        dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
 
-        dataContext.executeUpdate(new DropTable(table));
-        try {
-            DataSet ds = dataContext.query().from(table).selectCount().execute();
-            ds.next();
-            assertEquals("Row[values=[0]]", ds.getRow().toString());
-            ds.close();
-        } finally {
-            // restore the people documents for the next tests
-            insertPeopleDocuments();
-            client.admin().indices().prepareRefresh().execute().actionGet();
-            dataContext = new ElasticSearchDataContext(client, indexName);
-        }
+        DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+        assertTrue(dataSet.next());
+        assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+        assertFalse(dataSet.next());
+        dataSet.close();
     }
 
     @Test
@@ -549,26 +501,19 @@ public class ElasticSearchDataContextTest {
 
     @Test
     public void testNonDynamicMapingTableNames() throws Exception {
-        createIndex();
+        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
+        client.admin().indices().create(cir).actionGet();
+        
+        PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping, XContentType.JSON);
+        
+        client.admin().indices().putMapping(pmr).actionGet();
 
         ElasticSearchDataContext dataContext2 = new ElasticSearchDataContext(client, indexName2);
 
         assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray()));
     }
 
-    private static void createIndex() {
-        CreateIndexRequest cir = new CreateIndexRequest(indexName2);
-        CreateIndexResponse response = client.admin().indices().create(cir).actionGet();
-
-        System.out.println("create index: " + response.isAcknowledged());
-
-        PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping);
-
-        PutMappingResponse response2 = client.admin().indices().putMapping(pmr).actionGet();
-        System.out.println("put mapping: " + response2.isAcknowledged());
-    }
-
-    private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
+    private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
         BulkRequestBuilder bulkRequest = client.prepareBulk();
 
         for (int i = 0; i < numberOfDocuments; i++) {
@@ -578,17 +523,17 @@ public class ElasticSearchDataContextTest {
         bulkRequest.execute().actionGet();
     }
 
-    private static void indexTweeterDocument(String indexType, int id, Date date) {
-        client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date))
-                .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+    private void indexTweeterDocument(String indexType, int id, Date date) {
+        final String id1 = "tweet_" + indexType + "_" + id;
+        client.prepareIndex(indexName, indexType, id1).setSource(buildTweeterJson(id, date)).execute().actionGet();
     }
 
-    private static void indexTweeterDocument(String indexType, int id) {
+    private void indexTweeterDocument(String indexType, int id) {
         client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id))
                 .setId("tweet_" + indexType + "_" + id).execute().actionGet();
     }
 
-    private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
+    private void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
         client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute()
                 .actionGet();
     }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
index 8b5eb50..e08f715 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import junit.framework.TestCase;
 
 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
 import org.apache.metamodel.schema.ColumnType;
 import org.elasticsearch.common.collect.MapBuilder;
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
deleted file mode 100644
index 9ffc6b8..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import junit.framework.TestCase;
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.data.SimpleDataSetHeader;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.MutableColumn;
-
-import java.util.*;
-
-public class ElasticSearchUtilsTest extends TestCase {
-
-    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
-        MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
-        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
-        List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
-        String documentId = "doc1";
-        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
-        Map<String, Object> values = new HashMap<>();
-        values.put("value1", "theValue");
-        Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
-        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
-
-        assertEquals(primaryKeyValue, documentId);
-    }
-
-    public void testCreateRowWithParsableDates() throws Exception {
-        SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
-        SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
-        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
-        String documentId = "doc1";
-        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
-        Map<String, Object> values = new HashMap<>();
-        values.put("value1", "theValue");
-        values.put("value2", "2013-01-04T15:55:51.217+01:00");
-        Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
-        Object stringValue = row.getValue(item1);
-        Object dateValue = row.getValue(item2);
-
-        assertTrue(stringValue instanceof String);
-        assertTrue(dateValue instanceof Date);
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
deleted file mode 100644
index b94d0ab..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.node.Node;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class EmbeddedElasticsearchServer {
-
-    private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
-
-    private final Node node;
-    private final String dataDirectory;
-
-    public EmbeddedElasticsearchServer() {
-        this(DEFAULT_DATA_DIRECTORY);
-    }
-
-    public EmbeddedElasticsearchServer(String dataDirectory) {
-        this.dataDirectory = dataDirectory;
-
-        ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
-                .put("http.enabled", "true")
-                .put("path.data", dataDirectory);
-
-        node = nodeBuilder()
-                .local(true)
-                .settings(elasticsearchSettings.build())
-                .node();
-    }
-
-    public Client getClient() {
-        return node.client();
-    }
-
-    public void shutdown() {
-        node.close();
-        deleteDataDirectory();
-    }
-
-    private void deleteDataDirectory() {
-        try {
-            FileUtils.deleteDirectory(new File(dataDirectory));
-        } catch (IOException e) {
-            throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 9a3b1d8..46f930c 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -21,7 +21,7 @@
 	<name>MetaModel module for Elasticsearch</name>
 
 	<properties>
-		<elasticsearch.version>1.4.4</elasticsearch.version>
+		<elasticsearch.version>5.6.3</elasticsearch.version>
 	</properties>
 
 	<modules>

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
index 936cf4c..8f556e6 100644
--- a/elasticsearch/rest/pom.xml
+++ b/elasticsearch/rest/pom.xml
@@ -26,11 +26,6 @@ under the License.
 
 	<modelVersion>4.0.0</modelVersion>
 
-	<properties>
-		<jest.version>2.0.2</jest.version>
-		<elasticsearch.version>1.4.4</elasticsearch.version>
-	</properties>
-
 	<artifactId>MetaModel-elasticsearch-rest</artifactId>
 	<name>MetaModel module for ElasticSearch via REST client</name>
 
@@ -52,34 +47,28 @@ under the License.
 			<artifactId>commons-io</artifactId>
 		</dependency>
 
-		<!-- Jest -->
-		<dependency>
-			<groupId>io.searchbox</groupId>
-			<artifactId>jest</artifactId>
-			<version>${jest.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>commons-logging</groupId>
-					<artifactId>commons-logging</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
 		<dependency>
 			<groupId>org.slf4j</groupId>
 			<artifactId>jcl-over-slf4j</artifactId>
 		</dependency>
 		<!-- elasticsearch -->
 		<dependency>
-			<groupId>org.elasticsearch</groupId>
-			<artifactId>elasticsearch</artifactId>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>elasticsearch-rest-high-level-client</artifactId>
 			<version>${elasticsearch.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 
 		<!-- test -->
-		<dependency>
-			<groupId>org.slf4j</groupId>
-			<artifactId>slf4j-log4j12</artifactId>
+		<dependency><!-- required by elasticsearch -->
+			<groupId>org.apache.logging.log4j</groupId>
+			<artifactId>log4j-core</artifactId>
+			<version>2.9.1</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
@@ -87,5 +76,84 @@ under the License.
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.hamcrest</groupId>
+		    <artifactId>hamcrest-all</artifactId>
+		    <version>1.3</version>
+		    <scope>test</scope>
+		</dependency>
 	</dependencies>
+	<profiles>
+		<profile>
+			<id>integration-test</id>
+			<build>
+				<plugins>
+					<plugin>
+						<artifactId>maven-failsafe-plugin</artifactId>
+						<version>2.19.1</version>
+						<executions>
+							<execution>
+								<goals>
+									<goal>integration-test</goal>
+									<goal>verify</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+
+					<plugin>
+						<groupId>io.fabric8</groupId>
+						<artifactId>docker-maven-plugin</artifactId>
+						<version>${docker-maven-plugin.version}</version>
+						<configuration>
+							<logDate>default</logDate>
+							<autoPull>true</autoPull>
+							<images>
+								<image>
+									<name>elasticsearch-metamodel</name>
+									<build>
+										<dockerFileDir>${project.build.directory}/test-classes</dockerFileDir>
+									</build>
+									<run>
+										<ports>
+											<port>9200:9200</port>
+											<port>9300:9300</port>
+										</ports>
+										<env>
+											<ES_JAVA_OPTS>-Xms1g -Xmx1g</ES_JAVA_OPTS>
+											<cluster.name>docker-cluster</cluster.name>
+											<bootstrap.memory_lock>true</bootstrap.memory_lock>
+										    <xpack.security.enabled>false</xpack.security.enabled>
+										</env>
+										<wait>
+											<url>http://${docker.host.address}:9200</url>
+											<time>300000</time>
+										</wait>
+									</run>
+								</image>
+							</images>
+						</configuration>
+						<executions>
+							<execution>
+								<id>start</id>
+								<phase>pre-integration-test</phase>
+								<goals>
+									<goal>build</goal>
+									<goal>start</goal>
+								</goals>
+							</execution>
+							<execution>
+								<id>stop</id>
+								<phase>post-integration-test</phase>
+								<goals>
+									<goal>stop</goal>
+								</goals>
+							</execution>
+						</executions>
+					</plugin>
+				</plugins>
+			</build>
+		</profile>
+	</profiles>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
new file mode 100644
index 0000000..ddd7e17
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.main.MainRequest;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchRestClient extends RestHighLevelClient {
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+    public ElasticSearchRestClient(final RestClient restClient) {
+        super(restClient);
+    }
+
+    public final boolean refresh(final String indexName, final Header... headers) {
+        try {
+            return performRequest(new MainRequest(), request -> refresh(indexName),
+                    ElasticSearchRestClient::convertResponse, emptySet(), headers);
+        } catch (IOException e) {
+            logger.info("Failed to refresh index \"{}\"", indexName, e);
+        }
+        return false;
+    }
+
+    private static Request refresh(final String indexName) {
+        return new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_refresh", Collections.emptyMap(), null);
+    }
+
+    public final boolean delete(final String indexName, final Header... headers) throws IOException {
+        return performRequest(new MainRequest(), request -> delete(indexName),
+                ElasticSearchRestClient::convertResponse, emptySet(), headers);
+    }
+
+    private static Request delete(final String indexName) {
+        return new Request(HttpDelete.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+    }
+
+    public Set<Entry<String, Object>> getMappings(final String indexName, final Header... headers) throws IOException {
+        return performRequestAndParseEntity(new GetIndexRequest(), request -> getMappings(indexName), (
+                response) -> parseMappings(response, indexName), emptySet(), headers);
+    }
+
+    private static Request getMappings(final String indexName) {
+        return new Request(HttpGet.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+    }
+
+    public final boolean createMapping(final PutMappingRequest putMappingRequest, final Header... headers)
+            throws IOException {
+        return performRequest(putMappingRequest, request -> putMapping(putMappingRequest),
+                ElasticSearchRestClient::convertResponse, emptySet(), headers);
+    }
+
+    private static Request putMapping(final PutMappingRequest putMappingRequest) {
+        final String endpoint = "/" + putMappingRequest.indices()[0] + "/_mapping/" + putMappingRequest.type();
+        final ByteArrayEntity entity = new ByteArrayEntity(putMappingRequest.source().getBytes(),
+                ContentType.APPLICATION_JSON);
+        return new Request(HttpPut.METHOD_NAME, endpoint, Collections.emptyMap(), entity);
+    }
+
+    // Carbon copy of RestHighLevelClient#convertExistsResponse(Response) method, which is unaccessible from this class.
+    private static boolean convertResponse(final Response response) {
+        return response.getStatusLine().getStatusCode() == 200;
+    }
+
+    @SuppressWarnings("unchecked")
+    static Set<Entry<String, Object>> parseMappings(final XContentParser response, final String indexName) throws IOException {
+        Map<String, Object> schema = (Map<String, Object>) response.map().get(indexName);
+        Map<String, Object> tables = (Map<String, Object>) schema.get("mappings");
+
+        return tables.entrySet();
+    }
+
+    ActionResponse execute(final ActionRequest request) throws IOException {
+        if (request instanceof BulkRequest) {
+            return bulk((BulkRequest) request);
+        } else if (request instanceof IndexRequest) {
+            return index((IndexRequest) request);
+        } else if (request instanceof DeleteRequest) {
+            return delete((DeleteRequest) request);
+        } else if (request instanceof ClearScrollRequest) {
+            return clearScroll((ClearScrollRequest) request);
+        } else if (request instanceof SearchScrollRequest) {
+            return searchScroll((SearchScrollRequest) request);
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
new file mode 100644
index 0000000..91842f5
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.create.AbstractTableCreationBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+
+final class ElasticSearchRestCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchRestUpdateCallback> {
+
+    public ElasticSearchRestCreateTableBuilder(final ElasticSearchRestUpdateCallback updateCallback,
+            final Schema schema, final String name) {
+        super(updateCallback, schema, name);
+    }
+
+    @Override
+    public Table execute() throws MetaModelException {
+        final MutableTable table = getTable();
+        final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
+
+        final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
+        final String indexName = dataContext.getIndexName();
+
+        final PutMappingRequest putMapping = new PutMappingRequest(indexName).type(table.getName()).source(source);
+        getUpdateCallback().execute(putMapping);
+
+        final MutableSchema schema = (MutableSchema) getSchema();
+        schema.addTable(table);
+        return table;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
index c5a5696..5b32d14 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -18,57 +18,43 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.metamodel.BatchUpdateScript;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
 import org.apache.metamodel.UpdateScript;
 import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
 import org.apache.metamodel.data.DataSet;
 import org.apache.metamodel.data.DataSetHeader;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Count;
-import io.searchbox.core.CountResult;
-import io.searchbox.core.Get;
-import io.searchbox.core.Search;
-import io.searchbox.core.SearchResult;
-import io.searchbox.indices.mapping.GetMapping;
-import io.searchbox.params.Parameters;
-
 /**
  * DataContext implementation for ElasticSearch analytics engine.
  *
@@ -86,28 +72,14 @@ import io.searchbox.params.Parameters;
  * This implementation supports either automatic discovery of a schema or manual
  * specification of a schema, through the {@link SimpleTableDef} class.
  */
-public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext,
-        UpdateableDataContext {
+public class ElasticSearchRestDataContext extends AbstractElasticSearchDataContext {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
 
-    public static final String FIELD_ID = "_id";
-
-    // 1 minute timeout
-    public static final String TIMEOUT_SCROLL = "1m";
-
     // we scroll when more than 400 rows are expected
     private static final int SCROLL_THRESHOLD = 400;
 
-    private final JestClient elasticSearchClient;
-
-    private final String indexName;
-    // Table definitions that are set from the beginning, not supposed to be
-    // changed.
-    private final List<SimpleTableDef> staticTableDefinitions;
-
-    // Table definitions that are discovered, these can change
-    private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+    private final ElasticSearchRestClient elasticSearchClient;
 
     /**
      * Constructs a {@link ElasticSearchRestDataContext}. This constructor
@@ -122,18 +94,14 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      *            an array of {@link SimpleTableDef}s, which define the table
      *            and column model of the ElasticSearch index.
      */
-    public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) {
-        super(false);
+    public ElasticSearchRestDataContext(final ElasticSearchRestClient client, final String indexName,
+            final SimpleTableDef... tableDefinitions) {
+        super(indexName, tableDefinitions);
+
         if (client == null) {
             throw new IllegalArgumentException("ElasticSearch Client cannot be null");
         }
-        if (indexName == null || indexName.trim().length() == 0) {
-            throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
-        }
         this.elasticSearchClient = client;
-        this.indexName = indexName;
-        this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
-                .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
         this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
     }
 
@@ -147,65 +115,51 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      * @param indexName
      *            the name of the ElasticSearch index to represent
      */
-    public ElasticSearchRestDataContext(JestClient client, String indexName) {
+    public ElasticSearchRestDataContext(final ElasticSearchRestClient client, String indexName) {
         this(client, indexName, new SimpleTableDef[0]);
     }
 
-    /**
-     * Performs an analysis of the available indexes in an ElasticSearch cluster
-     * {@link JestClient} instance and detects the elasticsearch types structure
-     * based on the metadata provided by the ElasticSearch java client.
-     *
-     * @see {@link #detectTable(JsonObject, String)}
-     * @return a mutable schema instance, useful for further fine tuning by the
-     *         user.
-     */
-    private SimpleTableDef[] detectSchema() {
+    @Override
+    protected SimpleTableDef[] detectSchema() {
         logger.info("Detecting schema for index '{}'", indexName);
 
-        final JestResult jestResult;
+        final Set<Entry<String, Object>> mappings;
         try {
-            final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build();
-            jestResult = elasticSearchClient.execute(getMapping);
-        } catch (Exception e) {
+            mappings = getElasticSearchClient().getMappings(indexName);
+        } catch (IOException e) {
             logger.error("Failed to retrieve mappings", e);
             throw new MetaModelException("Failed to execute request for index information needed to detect schema", e);
         }
 
-        if (!jestResult.isSucceeded()) {
-            logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage());
-            throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage());
-        }
-
         final List<SimpleTableDef> result = new ArrayList<>();
 
-        final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName)
-                .getAsJsonObject("mappings").entrySet();
-        if (mappings.size() == 0) {
+        if (mappings.isEmpty()) {
             logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
         } else {
+            for (Entry<String, Object> mapping : mappings) {
+                final String documentType = mapping.getKey();
 
-            for (Map.Entry<String, JsonElement> entry : mappings) {
-                final String documentType = entry.getKey();
+                @SuppressWarnings("unchecked")
+                Map<String, Object> mappingConfiguration = (Map<String, Object>) mapping.getValue();
+                @SuppressWarnings("unchecked")
+                Map<String, Object> properties = (Map<String, Object>) mappingConfiguration.get("properties");
 
                 try {
-                    final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties")
-                            .getAsJsonObject(), documentType);
+                    final SimpleTableDef table = detectTable(properties, documentType);
                     result.add(table);
                 } catch (Exception e) {
                     logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
                 }
             }
         }
-        final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
-        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
-            @Override
-            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
+        return sortTables(result);
+    }
+
+    @Override
+    protected void onSchemaCacheRefreshed() {
+        getElasticSearchClient().refresh(indexName);
 
-        return tableDefArray;
+        detectSchema();
     }
 
     /**
@@ -219,60 +173,22 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
      *            the name of the index type
      * @return a table definition for ElasticSearch.
      */
-    private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) {
-        final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties);
+    private static SimpleTableDef detectTable(final Map<String, Object> metadataProperties, final String documentType) {
+        final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataProperties);
         return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes());
     }
 
     @Override
-    protected Schema getMainSchema() throws MetaModelException {
-        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
-        for (final SimpleTableDef tableDef : staticTableDefinitions) {
-            addTable(theSchema, tableDef);
-        }
-
-        final SimpleTableDef[] tables = detectSchema();
-        synchronized (this) {
-            dynamicTableDefinitions.clear();
-            dynamicTableDefinitions.addAll(Arrays.asList(tables));
-            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
-                final List<String> tableNames = theSchema.getTableNames();
-
-                if (!tableNames.contains(tableDef.getName())) {
-                    addTable(theSchema, tableDef);
-                }
-            }
-        }
-
-        return theSchema;
-    }
-
-    private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
-        final MutableTable table = tableDef.toTable().setSchema(theSchema);
-        final Column idColumn = table.getColumnByName(FIELD_ID);
-        if (idColumn != null && idColumn instanceof MutableColumn) {
-            final MutableColumn mutableColumn = (MutableColumn) idColumn;
-            mutableColumn.setPrimaryKey(true);
-        }
-        theSchema.addTable(table);
-    }
-
-    @Override
-    protected String getMainSchemaName() throws MetaModelException {
-        return indexName;
-    }
-
-    @Override
-    protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
-            List<FilterItem> whereItems, int firstRow, int maxRows) {
+    protected DataSet materializeMainSchemaTable(final Table table, final List<SelectItem> selectItems,
+            final List<FilterItem> whereItems, final int firstRow, final int maxRows) {
         final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
                 LogicalOperator.AND);
         if (queryBuilder != null) {
             // where clause can be pushed down to an ElasticSearch query
             SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder);
-            SearchResult result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
+            SearchResponse result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
 
-            return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems);
+            return new ElasticSearchRestDataSet(getElasticSearchClient(), result, selectItems);
         }
         return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
     }
@@ -282,30 +198,30 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         return !limitMaxRowsIsSet(maxRows) || maxRows > SCROLL_THRESHOLD;
     }
 
-    private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) {
-        Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(
-                table.getName());
+    private SearchResponse executeSearch(final Table table, final SearchSourceBuilder searchSourceBuilder,
+            final boolean scroll) {
+        final SearchRequest searchRequest = new SearchRequest(new String[] { getIndexName() }, searchSourceBuilder)
+                .types(table.getName());
+
         if (scroll) {
-            builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
+            searchRequest.scroll(TIMEOUT_SCROLL);
         }
 
-        Search search = builder.build();
-        SearchResult result;
         try {
-            result = elasticSearchClient.execute(search);
-        } catch (Exception e) {
+            return getElasticSearchClient().search(searchRequest);
+        } catch (IOException e) {
             logger.warn("Could not execute ElasticSearch query", e);
             throw new MetaModelException("Could not execute ElasticSearch query", e);
         }
-        return result;
     }
 
     @Override
     protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
-        SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
+        SearchResponse searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
                 maxRows));
 
-        return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns.stream().map(SelectItem::new).collect(Collectors.toList()));
+        return new ElasticSearchRestDataSet(getElasticSearchClient(), searchResult, columns.stream()
+                .map(SelectItem::new).collect(Collectors.toList()));
     }
 
     private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) {
@@ -317,7 +233,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         if (limitMaxRowsIsSet(maxRows)) {
             searchRequest.size(maxRows);
         } else {
-            searchRequest.size(Integer.MAX_VALUE);
+            searchRequest.size(SCROLL_THRESHOLD);
         }
 
         if (queryBuilder != null) {
@@ -337,12 +253,16 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
         final String documentType = table.getName();
         final String id = keyValue.toString();
 
-        final Get get = new Get.Builder(indexName, id).type(documentType).build();
-        final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get);
-
         final DataSetHeader header = new SimpleDataSetHeader(selectItems);
 
-        return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header);
+        try {
+            return ElasticSearchUtils.createRow(getElasticSearchClient()
+                    .get(new GetRequest(getIndexName(), documentType, id))
+                    .getSource(), id, header);
+        } catch (IOException e) {
+            logger.warn("Could not execute ElasticSearch query", e);
+            throw new MetaModelException("Could not execute ElasticSearch query", e);
+        }
     }
 
     @Override
@@ -352,30 +272,23 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
             return null;
         }
         final String documentType = table.getName();
-        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+        final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
         sourceBuilder.query(QueryBuilders.termQuery("_type", documentType));
+        sourceBuilder.size(0);
 
-        Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build();
-
-        CountResult countResult;
         try {
-            countResult = elasticSearchClient.execute(count);
+            return getElasticSearchClient().search(new SearchRequest(new String[] { getIndexName() }, sourceBuilder))
+                    .getHits().getTotalHits();
         } catch (Exception e) {
             logger.warn("Could not execute ElasticSearch get query", e);
             throw new MetaModelException("Could not execute ElasticSearch get query", e);
         }
-
-        return countResult.getCount();
-    }
-
-    private boolean limitMaxRowsIsSet(int maxRows) {
-        return (maxRows != -1);
     }
 
     @Override
     public UpdateSummary executeUpdate(UpdateScript update) {
         final boolean isBatch = update instanceof BatchUpdateScript;
-        final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
+        final ElasticSearchRestUpdateCallback callback = new ElasticSearchRestUpdateCallback(this, isBatch);
         update.run(callback);
         callback.onExecuteUpdateFinished();
         return callback.getUpdateSummary();
@@ -384,14 +297,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
     /**
      * Gets the {@link JestClient} that this {@link DataContext} is wrapping.
      */
-    public JestClient getElasticSearchClient() {
+    public ElasticSearchRestClient getElasticSearchClient() {
         return elasticSearchClient;
     }
-
-    /**
-     * Gets the name of the index that this {@link DataContext} is working on.
-     */
-    public String getIndexName() {
-        return indexName;
-    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
index b2dc4c3..b1756b7 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
@@ -18,6 +18,14 @@
  */
 package org.apache.metamodel.elasticsearch.rest;
 
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.metamodel.ConnectionException;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.factory.DataContextFactory;
@@ -25,10 +33,8 @@ import org.apache.metamodel.factory.DataContextProperties;
 import org.apache.metamodel.factory.ResourceFactoryRegistry;
 import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
 import org.apache.metamodel.util.SimpleTableDef;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.config.HttpClientConfig;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 
 /**
  * Factory for ElasticSearch data context of REST type.
@@ -72,18 +78,20 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory {
         return true;
     }
 
-    private JestClient createClient(DataContextProperties properties) {
-        final String serverUri = properties.getUrl();
-        final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri);
+    private ElasticSearchRestClient createClient(final DataContextProperties properties) throws MalformedURLException {
+        final URL url = new URL(properties.getUrl());
+        final RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort()));
+        
         if (properties.getUsername() != null) {
-            builder.defaultCredentials(properties.getUsername(), properties.getPassword());
+            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(),
+                    properties.getPassword()));
+
+            builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+                    credentialsProvider));
         }
 
-        final JestClientFactory clientFactory = new JestClientFactory();
-        final HttpClientConfig httpClientConfig = new HttpClientConfig(builder);
-        clientFactory.setHttpClientConfig(httpClientConfig);
-        final JestClient client = clientFactory.getObject();
-        return client;
+        return new ElasticSearchRestClient(builder.build());
     }
 
     private String getIndex(DataContextProperties properties) {
@@ -97,10 +105,14 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory {
     @Override
     public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
             throws UnsupportedDataContextPropertiesException, ConnectionException {
-        final JestClient client = createClient(properties);
-        final String indexName = getIndex(properties);
-        final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
-        return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+        try {
+            ElasticSearchRestClient client = createClient(properties);
+            final String indexName = getIndex(properties);
+            final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
+            return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+        } catch (MalformedURLException e) {
+            throw new UnsupportedDataContextPropertiesException(e);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
new file mode 100644
index 0000000..d79b271
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+final class ElasticSearchRestDataSet extends AbstractElasticSearchDataSet {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataSet.class);
+
+    private final ElasticSearchRestClient _client;
+
+    public ElasticSearchRestDataSet(final ElasticSearchRestClient client, final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+        super(searchResponse, selectItems);
+        _client = client;
+    }
+
+    @Override
+    public void closeNow() {
+        final String scrollId = _searchResponse.getScrollId();
+        final ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+        clearScrollRequest.addScrollId(scrollId);
+        try {
+            _client.execute(clearScrollRequest);
+        } catch (IOException e) {
+            logger.warn("Could not clear scroll.", e);
+        }
+    }
+
+    @Override
+    protected SearchResponse scrollSearchResponse(final String scrollId) throws IOException {
+        return _client.searchScroll(new SearchScrollRequest(scrollId).scroll(
+                AbstractElasticSearchDataContext.TIMEOUT_SCROLL));
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
new file mode 100644
index 0000000..f8caa2d
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * {@link RowDeletionBuilder} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestDeleteBuilder extends AbstractRowDeletionBuilder {
+    private final ElasticSearchRestUpdateCallback _updateCallback;
+
+    public ElasticSearchRestDeleteBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final Table table = getTable();
+        final String documentType = table.getName();
+
+        final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext();
+        final String indexName = dataContext.getIndexName();
+
+        final List<FilterItem> whereItems = getWhereItems();
+
+        // delete by query - note that creteQueryBuilderForSimpleWhere may
+        // return matchAllQuery() if no where items are present.
+        final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
+                LogicalOperator.AND);
+        if (queryBuilder == null) {
+            // TODO: The where items could not be pushed down to a query. We
+            // could solve this by running a query first, gather all
+            // document IDs and then delete by IDs.
+            throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
+                    + whereItems);
+        }
+
+        final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.query(queryBuilder);
+
+        SearchRequest searchRequest = new SearchRequest(indexName);
+        searchRequest.types(documentType);
+        searchRequest.source(searchSourceBuilder);
+
+        try {
+            final SearchResponse response = dataContext.getElasticSearchClient().search(searchRequest);
+
+            final Iterator<SearchHit> iterator = response.getHits().iterator();
+            while (iterator.hasNext()) {
+                final SearchHit hit = iterator.next();
+                final String typeId = hit.getId();
+
+                DeleteRequest deleteRequest = new DeleteRequest(indexName, documentType, typeId);
+
+                _updateCallback.execute(deleteRequest);
+            }
+        } catch (IOException e) {
+            throw new MetaModelException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
new file mode 100644
index 0000000..0ba4f66
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexRequest;
+
+final class ElasticSearchRestInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchRestUpdateCallback> {
+
+    public ElasticSearchRestInsertBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
+        super(updateCallback, table);
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+        final ElasticSearchRestUpdateCallback updateCallback = getUpdateCallback();
+        final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
+        final String indexName = dataContext.getIndexName();
+        final String documentType = getTable().getName();
+
+        final Map<String, Object> source = new HashMap<>();
+        final Column[] columns = getColumns();
+        final Object[] values = getValues();
+        String id = null;
+        for (int i = 0; i < columns.length; i++) {
+            if (isSet(columns[i])) {
+                final String columnName = columns[i].getName();
+
+                final Object value = values[i];
+                if (ElasticSearchUtils.FIELD_ID.equals(columnName)) {
+                    if (value != null) {
+                        id = value.toString();
+                    }
+                } else {
+                    final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
+                    source.put(fieldName, value);
+                }
+            }
+        }
+
+        assert !source.isEmpty();
+
+        IndexRequest indexRequest = new IndexRequest(indexName, documentType, id);
+        indexRequest.source(source);
+
+        getUpdateCallback().execute(indexRequest);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
new file mode 100644
index 0000000..defd18f
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link UpdateCallback} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestUpdateCallback extends AbstractUpdateCallback {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestUpdateCallback.class);
+
+    private static final int BULK_BUFFER_SIZE = 1000;
+
+    private BulkRequest bulkRequest;
+    private int bulkActionCount = 0;
+    private final boolean isBatch;
+
+    public ElasticSearchRestUpdateCallback(final ElasticSearchRestDataContext dataContext, final boolean isBatch) {
+        super(dataContext);
+        this.isBatch = isBatch;
+    }
+
+    private boolean isBatch() {
+        return isBatch;
+    }
+
+    @Override
+    public ElasticSearchRestDataContext getDataContext() {
+        return (ElasticSearchRestDataContext) super.getDataContext();
+    }
+
+    @Override
+    public TableCreationBuilder createTable(final Schema schema, final String name) throws IllegalArgumentException,
+            IllegalStateException {
+        return new ElasticSearchRestCreateTableBuilder(this, schema, name);
+    }
+
+    @Override
+    public boolean isDropTableSupported() {
+        return false;
+    }
+
+    @Override
+    public TableDropBuilder dropTable(final Table table) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RowInsertionBuilder insertInto(final Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchRestInsertBuilder(this, table);
+    }
+
+    @Override
+    public boolean isDeleteSupported() {
+        return true;
+    }
+
+    @Override
+    public RowDeletionBuilder deleteFrom(final Table table) throws IllegalArgumentException, IllegalStateException,
+            UnsupportedOperationException {
+        return new ElasticSearchRestDeleteBuilder(this, table);
+    }
+
+    public void onExecuteUpdateFinished() {
+        if (isBatch()) {
+            flushBulkActions();
+        }
+
+        getDataContext().refreshSchemas();
+    }
+
+    private void flushBulkActions() {
+        if (bulkRequest == null || bulkActionCount == 0) {
+            // nothing to flush
+            return;
+        }
+
+        logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
+        executeBlocking(bulkRequest);
+
+        bulkActionCount = 0;
+        bulkRequest = null;
+    }
+
+    public void execute(final ActionRequest action) {
+        if (isBatch() && (action instanceof DocWriteRequest<?>)) {
+            getBulkRequest().add((DocWriteRequest<?>) action);
+            bulkActionCount++;
+            if (bulkActionCount == BULK_BUFFER_SIZE) {
+                flushBulkActions();
+            }
+        } else {
+            executeBlocking(action);
+        }
+    }
+
+    private void executeBlocking(final ActionRequest action) {
+        try {
+            if (action instanceof PutMappingRequest) {
+                getDataContext().getElasticSearchClient().createMapping((PutMappingRequest) action);
+            } else {
+                final ActionResponse result = getDataContext().getElasticSearchClient().execute(action);
+
+                if (result instanceof BulkResponse && ((BulkResponse) result).hasFailures()) {
+                    BulkItemResponse[] failedItems = ((BulkResponse) result).getItems();
+                    for (int i = 0; i < failedItems.length; i++) {
+                        if (failedItems[i].isFailed()) {
+                            final BulkItemResponse failedItem = failedItems[i];
+                            logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i + 1,
+                                    failedItems.length, failedItem.getId(), failedItem.getOpType(), failedItem.status(),
+                                    failedItem.getFailureMessage());
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            logger.warn("Could not execute command {} ", action, e);
+            throw new MetaModelException("Could not execute " + action, e);
+        }
+    }
+
+    private BulkRequest getBulkRequest() {
+        if (bulkRequest == null) {
+            bulkRequest = new BulkRequest();
+        }
+        return bulkRequest;
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
deleted file mode 100644
index 1bb026d..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.Action;
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import org.apache.metamodel.MetaModelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-final class JestClientExecutor {
-    private static final Logger logger = LoggerFactory.getLogger(JestClientExecutor.class);
-
-    static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest) {
-        return execute(jestClient, clientRequest, true);
-    }
-
-    static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest, boolean doThrow) {
-        try {
-            final T result = jestClient.execute(clientRequest);
-            logger.debug("{} response: acknowledged={}", clientRequest, result.isSucceeded());
-            return result;
-        } catch (IOException e) {
-            logger.warn("Could not execute command {} ", clientRequest, e);
-            if (doThrow) {
-                throw new MetaModelException("Could not execute command " + clientRequest, e);
-            }
-        }
-
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
deleted file mode 100644
index cc42b07..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.GenericResultAbstractAction;
-
-public class JestDeleteScroll extends GenericResultAbstractAction {
-    private JestDeleteScroll(Builder builder) {
-        super(builder);
-        this.payload = builder.getScrollId();
-        setURI(buildURI());
-    }
-
-    @Override
-    public String getRestMethodName() {
-        return "DELETE";
-    }
-
-    @Override
-    protected String buildURI() {
-        return super.buildURI() + "/_search/scroll";
-    }
-
-    public static class Builder extends GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> {
-        private final String scrollId;
-
-        public Builder(String scrollId) {
-            this.scrollId = scrollId;
-        }
-
-        @Override
-        public JestDeleteScroll build() {
-            return new JestDeleteScroll(this);
-        }
-
-        public String getScrollId() {
-            return scrollId;
-        }
-    }
-
-}


[4/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

Posted by ka...@apache.org.
METAMODEL-1179: Upgraded ElasticSearch REST module to new client.

Using the official elastic REST high level client for ElasticSearch.

Closes #177

Project: http://git-wip-us.apache.org/repos/asf/metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/metamodel/commit/bda8d764
Tree: http://git-wip-us.apache.org/repos/asf/metamodel/tree/bda8d764
Diff: http://git-wip-us.apache.org/repos/asf/metamodel/diff/bda8d764

Branch: refs/heads/master
Commit: bda8d764f65acdf3b1f520d21cb51a7396e23c7d
Parents: c57d508
Author: Arjan Seijkens <ar...@gmail.com>
Authored: Thu Jan 25 20:13:08 2018 -0800
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Thu Jan 25 20:13:08 2018 -0800

----------------------------------------------------------------------
 .travis.yml                                     |   5 +-
 CHANGES.md                                      |   3 +-
 .../AbstractElasticSearchDataContext.java       | 140 +++++
 .../AbstractElasticSearchDataSet.java           | 123 ++++
 .../common/ElasticSearchDateConverter.java      |  17 +-
 .../common/ElasticSearchMetaDataParser.java     |  79 +++
 .../common/ElasticSearchUtils.java              |  80 +--
 .../common/ElasticSearchUtilsTest.java          |  63 ++
 elasticsearch/native/pom.xml                    |  36 ++
 .../ElasticSearchCreateTableBuilder.java        |   8 +-
 .../nativeclient/ElasticSearchDataContext.java  | 171 ++----
 .../ElasticSearchDataContextFactory.java        |  47 +-
 .../nativeclient/ElasticSearchDataSet.java      |  90 +--
 .../ElasticSearchDeleteBuilder.java             |  27 +-
 .../ElasticSearchDropTableBuilder.java          | 103 ----
 .../ElasticSearchInsertBuilder.java             |   8 +-
 .../ElasticSearchMetaDataParser.java            |  81 ---
 .../ElasticSearchUpdateBuilder.java             | 116 ++++
 .../ElasticSearchUpdateCallback.java            |  10 +-
 .../nativeclient/NativeElasticSearchUtils.java  |  67 --
 .../ElasticSearchDataContextTest.java           | 231 +++----
 .../ElasticSearchMetaDataParserTest.java        |   1 +
 .../nativeclient/ElasticSearchUtilsTest.java    |  63 --
 .../utils/EmbeddedElasticsearchServer.java      |  71 ---
 elasticsearch/pom.xml                           |   2 +-
 elasticsearch/rest/pom.xml                      | 114 +++-
 .../rest/ElasticSearchRestClient.java           | 134 ++++
 .../ElasticSearchRestCreateTableBuilder.java    |  55 ++
 .../rest/ElasticSearchRestDataContext.java      | 226 ++-----
 .../ElasticSearchRestDataContextFactory.java    |  46 +-
 .../rest/ElasticSearchRestDataSet.java          |  65 ++
 .../rest/ElasticSearchRestDeleteBuilder.java    |  96 +++
 .../rest/ElasticSearchRestInsertBuilder.java    |  72 +++
 .../rest/ElasticSearchRestUpdateCallback.java   | 167 +++++
 .../elasticsearch/rest/JestClientExecutor.java  |  51 --
 .../elasticsearch/rest/JestDeleteScroll.java    |  57 --
 .../JestElasticSearchCreateTableBuilder.java    |  56 --
 .../rest/JestElasticSearchDataSet.java          | 124 ----
 .../rest/JestElasticSearchDeleteBuilder.java    |  76 ---
 .../rest/JestElasticSearchDropTableBuilder.java |  62 --
 .../rest/JestElasticSearchInsertBuilder.java    |  74 ---
 .../rest/JestElasticSearchMetaDataParser.java   |  75 ---
 .../rest/JestElasticSearchUpdateCallback.java   | 164 -----
 .../rest/JestElasticSearchUtils.java            |  90 ---
 .../ElasticSearchRestDataContexFactoryIT.java   | 131 ++++
 .../rest/ElasticSearchRestDataContextIT.java    | 535 ++++++++++++++++
 .../rest/JestElasticSearchDataContextTest.java  | 615 -------------------
 .../JestElasticSearchMetaDataParserTest.java    |  70 ---
 .../rest/JestElasticSearchUtilsTest.java        | 188 ------
 .../rest/utils/EmbeddedElasticsearchServer.java |  72 ---
 .../rest/src/test/resources/Dockerfile          |   5 +
 .../rest/src/test/resources/elasticsearch.yml   |  13 +
 .../apache/metamodel/DataContextFactory.java    |   6 +-
 hbase/pom.xml                                   |   4 +
 pom.xml                                         |   1 +
 55 files changed, 2276 insertions(+), 2810 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 1b1e342..65df979 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -15,7 +15,10 @@ before_install:
 services:
   - couchdb
   - mongodb
-  
+  - docker
+
+script: "mvn clean verify -P integration-test"  
+
 after_success:
   - mvn test javadoc:javadoc
  

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/CHANGES.md
----------------------------------------------------------------------
diff --git a/CHANGES.md b/CHANGES.md
index 5af3fc4..7ab1d22 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,6 @@
-### WIP
+### Apache MetaModel 5.1.0 (WIP)
 
+ * [METAMODEL-1179] - Refactored ElasticSearch REST module to use new official REST based client from Elastic.
  * [METAMODEL-1177] - Made TableType.TABLE the default table type, replacing null values.
 
 ### Apache MetaModel 5.0.1

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
new file mode 100644
index 0000000..c8ffae3
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataContext.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.UpdateableDataContext;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.MutableColumn;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.common.unit.TimeValue;
+
+public abstract class AbstractElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext,
+        UpdateableDataContext {
+
+    public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
+
+    protected final String indexName;
+
+    // Table definitions that are set from the beginning, not supposed to be
+    // changed.
+    protected final List<SimpleTableDef> staticTableDefinitions;
+
+    // Table definitions that are discovered, these can change
+    protected final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+
+    /**
+     * Constructs a {@link ElasticSearchRestDataContext}. This constructor
+     * accepts a custom array of {@link SimpleTableDef}s which allows the user
+     * to define his own view on the indexes in the engine.
+     *
+     * @param indexName
+     *            the name of the ElasticSearch index to represent
+     * @param tableDefinitions
+     *            an array of {@link SimpleTableDef}s, which define the table
+     *            and column model of the ElasticSearch index.
+     */
+    public AbstractElasticSearchDataContext(String indexName, SimpleTableDef... tableDefinitions) {
+        super(false);
+        if (indexName == null || indexName.trim().length() == 0) {
+            throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
+        }
+        this.indexName = indexName;
+        this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
+                .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
+    }
+
+    /**
+     * Performs an analysis of the available indexes in an ElasticSearch cluster
+     * {@link JestClient} instance and detects the elasticsearch types structure
+     * based on the metadata provided by the ElasticSearch java client.
+     *
+     * @see {@link #detectTable(JsonObject, String)}
+     * @return a mutable schema instance, useful for further fine tuning by the
+     *         user.
+     */
+    protected abstract SimpleTableDef[] detectSchema();
+
+    @Override
+    protected Schema getMainSchema() throws MetaModelException {
+        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+        for (final SimpleTableDef tableDef : staticTableDefinitions) {
+            addTable(theSchema, tableDef);
+        }
+
+        final SimpleTableDef[] tables = detectSchema();
+        synchronized (this) {
+            dynamicTableDefinitions.clear();
+            dynamicTableDefinitions.addAll(Arrays.asList(tables));
+            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
+                final List<String> tableNames = theSchema.getTableNames();
+
+                if (!tableNames.contains(tableDef.getName())) {
+                    addTable(theSchema, tableDef);
+                }
+            }
+        }
+
+        return theSchema;
+    }
+
+    private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
+        final MutableTable table = tableDef.toTable().setSchema(theSchema);
+        final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
+        if (idColumn != null && idColumn instanceof MutableColumn) {
+            final MutableColumn mutableColumn = (MutableColumn) idColumn;
+            mutableColumn.setPrimaryKey(true);
+        }
+        theSchema.addTable(table);
+    }
+
+    @Override
+    protected String getMainSchemaName() throws MetaModelException {
+        return indexName;
+    }
+
+    /**
+     * Gets the name of the index that this {@link DataContext} is working on.
+     */
+    public String getIndexName() {
+        return indexName;
+    }
+
+    protected boolean limitMaxRowsIsSet(int maxRows) {
+        return (maxRows != -1);
+    }
+    
+    protected static SimpleTableDef[] sortTables(final List<SimpleTableDef> result) {
+        final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
+        Arrays.sort(tableDefArray, (o1, o2) -> o1.getName().compareTo(o2.getName()));
+        return tableDefArray;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
new file mode 100644
index 0000000..fea2190
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/AbstractElasticSearchDataSet.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+public abstract class AbstractElasticSearchDataSet extends AbstractDataSet {
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractElasticSearchDataSet.class);
+
+    protected final AtomicBoolean _closed;
+
+    protected SearchResponse _searchResponse;
+    protected SearchHit _currentHit;
+    protected int _hitIndex = 0;
+
+    public AbstractElasticSearchDataSet(final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+        super(selectItems);
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        boolean closeNow = _closed.compareAndSet(true, false);
+        if (closeNow) {
+            closeNow();
+        }
+    }
+
+    protected abstract void closeNow();
+    
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        if (!_closed.get()) {
+            logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
+            close();
+        }
+    }
+
+    @Override
+    public boolean next() {
+        final SearchHit[] hits = _searchResponse.getHits().getHits();
+        if (hits.length == 0) {
+            // break condition for the scroll
+            _currentHit = null;
+            return false;
+        }
+
+        if (_hitIndex < hits.length) {
+            // pick the next hit within this search response
+            _currentHit = hits[_hitIndex];
+            _hitIndex++;
+            return true;
+        }
+
+        final String scrollId = _searchResponse.getScrollId();
+        if (scrollId == null) {
+            // this search response is not scrolleable - then it's the end.
+            _currentHit = null;
+            return false;
+        }
+
+        // try to scroll to the next set of hits
+        try {
+            _searchResponse = scrollSearchResponse(scrollId);
+        } catch (IOException e) {
+            logger.warn("Failed to scroll to the next search response set.", e);
+            return false;
+        }
+
+        // start over (recursively)
+        _hitIndex = 0;
+        return next();
+    }
+    
+    protected abstract SearchResponse scrollSearchResponse(final String scrollId) throws IOException;
+
+    @Override
+    public Row getRow() {
+        if (_currentHit == null) {
+            return null;
+        }
+
+        final Map<String, Object> source = _currentHit.getSource();
+        final String documentId = _currentHit.getId();
+        return ElasticSearchUtils.createRow(source, documentId, getHeader());
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
index a6ce656..652fbe6 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchDateConverter.java
@@ -20,6 +20,7 @@ package org.apache.metamodel.elasticsearch.common;
 
 import org.apache.metamodel.util.TimeComparator;
 
+import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -30,12 +31,22 @@ import java.util.Date;
  */
 public final class ElasticSearchDateConverter {
 
+    private static final DateFormat DEFAULT_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+    private static final DateFormat FALLBACK_DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
+
     public static Date tryToConvert(String dateAsString) {
+        if (dateAsString == null) {  
+            return null;
+        }
+
         try {
-            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX");
-            return dateFormat.parse(dateAsString);
+            return DEFAULT_DATE_FORMAT.parse(dateAsString);
         } catch (ParseException e) {
-            return TimeComparator.toDate(dateAsString);
+            try {
+                return FALLBACK_DATE_FORMAT.parse(dateAsString);
+            } catch (ParseException e1) {
+                return TimeComparator.toDate(dateAsString);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
new file mode 100644
index 0000000..32f07ff
--- /dev/null
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchMetaDataParser.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * Parser that transforms the ElasticSearch metadata response (json-like format)
+ * into an ElasticSearchMetaData object.
+ */
+public class ElasticSearchMetaDataParser {
+
+    /**
+     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
+     * object. This method makes much easier to create the ElasticSearch schema.
+     *
+     * @param metaDataInfo
+     *            ElasticSearch mapping metadata in Map format
+     * @return An ElasticSearchMetaData object
+     */
+    public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
+        final String[] fieldNames = new String[metaDataInfo.size() + 1];
+        final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1];
+
+        // add the document ID field (fixed)
+        fieldNames[0] = ElasticSearchUtils.FIELD_ID;
+        columnTypes[0] = ColumnType.STRING;
+
+        int i = 1;
+        for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
+            @SuppressWarnings("unchecked")
+            final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue();
+
+            fieldNames[i] = metaDataField.getKey();
+            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
+            i++;
+
+        }
+        return new ElasticSearchMetaData(fieldNames, columnTypes);
+    }
+
+    private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) {
+        final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
+
+        if (metaDataFieldType == null) {
+            return ColumnType.STRING;
+        }
+
+        return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
+    }
+
+    private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) {
+        final Object type = metaDataField.get("type");
+        if (type == null) {
+            return null;
+        }
+        return type.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index b298d11..9128182 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -18,72 +18,43 @@
  */
 package org.apache.metamodel.elasticsearch.common;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.query.OperatorType;
+import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.ColumnType;
 import org.apache.metamodel.schema.MutableColumn;
 import org.apache.metamodel.schema.MutableTable;
 import org.apache.metamodel.util.CollectionUtils;
-import org.elasticsearch.common.base.Strings;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.index.query.BoolQueryBuilder;
-import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.ExistsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class ElasticSearchUtils {
 
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class);
-
     public static final String FIELD_ID = "_id";
     public static final String SYSTEM_PROPERTY_STRIP_INVALID_FIELD_CHARS = "metamodel.elasticsearch.strip_invalid_field_chars";
 
-    /**
-     * Gets a "filter" query which is both 1.x and 2.x compatible.
-     */
-    private static QueryBuilder getFilteredQuery(String prefix, String fieldName) {
-        // 1.x: itemQueryBuilder = QueryBuilders.filteredQuery(null,
-        // FilterBuilders.missingFilter(fieldName));
-        // 2.x: itemQueryBuilder =
-        // QueryBuilders.boolQuery().must(QueryBuilders.missingQuery(fieldName));
-        try {
-            try {
-                Method method = QueryBuilders.class.getDeclaredMethod(prefix + "Query", String.class);
-                method.setAccessible(true);
-                return QueryBuilders.boolQuery().must((QueryBuilder) method.invoke(null, fieldName));
-            } catch (NoSuchMethodException e) {
-                Class<?> clazz = ElasticSearchUtils.class.getClassLoader().loadClass(
-                        "org.elasticsearch.index.query.FilterBuilders");
-                Method filterBuilderMethod = clazz.getDeclaredMethod(prefix + "Filter", String.class);
-                filterBuilderMethod.setAccessible(true);
-                Method queryBuildersFilteredQueryMethod = QueryBuilders.class.getDeclaredMethod("filteredQuery",
-                        QueryBuilder.class, FilterBuilder.class);
-                return (QueryBuilder) queryBuildersFilteredQueryMethod.invoke(null, null, filterBuilderMethod.invoke(
-                        null, fieldName));
-            }
-        } catch (Exception e) {
-            logger.error("Failed to resolve/invoke filtering method", e);
-            throw new IllegalStateException("Failed to resolve filtering method", e);
-        }
-    }
-
     public static QueryBuilder getMissingQuery(String fieldName) {
-        return getFilteredQuery("missing", fieldName);
+        return new BoolQueryBuilder().mustNot(new ExistsQueryBuilder(fieldName));
     }
 
     public static QueryBuilder getExistsQuery(String fieldName) {
-        return getFilteredQuery("exists", fieldName);
+        return new ExistsQueryBuilder(fieldName);
     }
 
     public static Map<String, ?> getMappingSource(final MutableTable table) {
@@ -170,7 +141,7 @@ public class ElasticSearchUtils {
         }
 
         if (type.isLiteral()) {
-            return "string";
+            return "text";
         } else if (type == ColumnType.FLOAT) {
             return "float";
         } else if (type == ColumnType.DOUBLE || type == ColumnType.NUMERIC || type == ColumnType.NUMBER) {
@@ -294,4 +265,35 @@ public class ElasticSearchUtils {
         }
         return columnType;
     }
+
+    public static Row createRow(final Map<String, Object> sourceMap, final String documentId, final DataSetHeader header) {
+        final Object[] values = new Object[header.size()];
+        for (int i = 0; i < values.length; i++) {
+            final SelectItem selectItem = header.getSelectItem(i);
+            final Column column = selectItem.getColumn();
+
+            assert column != null;
+            assert selectItem.getAggregateFunction() == null;
+            assert selectItem.getScalarFunction() == null;
+
+            if (column.isPrimaryKey()) {
+                values[i] = documentId;
+            } else {
+                Object value = sourceMap.get(column.getName());
+
+                if (column.getType() == ColumnType.DATE) {
+                    Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
+                    if (valueToDate == null) {
+                        values[i] = value;
+                    } else {
+                        values[i] = valueToDate;
+                    }
+                } else {
+                    values[i] = value;
+                }
+            }
+        }
+
+        return new DefaultRow(header, values);
+    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
new file mode 100644
index 0000000..9fb7e03
--- /dev/null
+++ b/elasticsearch/common/src/test/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtilsTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.common;
+
+import junit.framework.TestCase;
+import org.apache.metamodel.data.DataSetHeader;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.MutableColumn;
+
+import java.util.*;
+
+public class ElasticSearchUtilsTest extends TestCase {
+
+    public void testAssignDocumentIdForPrimaryKeys() throws Exception {
+        MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
+        SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
+        List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        Map<String, Object> values = new HashMap<>();
+        values.put("value1", "theValue");
+        Row row = ElasticSearchUtils.createRow(values, documentId, header);
+        String primaryKeyValue = (String) row.getValue(primaryKeyItem);
+
+        assertEquals(primaryKeyValue, documentId);
+    }
+
+    public void testCreateRowWithParsableDates() throws Exception {
+        SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
+        SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
+        List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
+        String documentId = "doc1";
+        DataSetHeader header = new SimpleDataSetHeader(selectItems1);
+        Map<String, Object> values = new HashMap<>();
+        values.put("value1", "theValue");
+        values.put("value2", "2013-01-04T15:55:51.217+01:00");
+        Row row = ElasticSearchUtils.createRow(values, documentId, header);
+        Object stringValue = row.getValue(item1);
+        Object dateValue = row.getValue(item2);
+
+        assertTrue(stringValue instanceof String);
+        assertTrue(dateValue instanceof Date);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/native/pom.xml b/elasticsearch/native/pom.xml
index 4c1abcf..6adca36 100644
--- a/elasticsearch/native/pom.xml
+++ b/elasticsearch/native/pom.xml
@@ -43,6 +43,17 @@
 			<artifactId>elasticsearch</artifactId>
 			<version>${elasticsearch.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.elasticsearch.client</groupId>
+			<artifactId>transport</artifactId>
+			<version>${elasticsearch.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
 
 		<!-- test -->
 		<dependency>
@@ -53,7 +64,32 @@
 		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
+			<version>4.12</version>
 			<scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>org.hamcrest</groupId>
+					<artifactId>hamcrest-core</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+		    <groupId>org.elasticsearch.test</groupId>
+		    <artifactId>framework</artifactId>
+			<version>${elasticsearch.version}</version>
+		    <scope>test</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>commons-logging</groupId>
+					<artifactId>commons-logging</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+		    <groupId>org.apache.logging.log4j</groupId>
+		    <artifactId>log4j-core</artifactId>
+		    <version>2.9.1</version>
+		    <scope>test</scope>
 		</dependency>
 	</dependencies>
 

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
index f27e8ac..4e5873c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchCreateTableBuilder.java
@@ -27,6 +27,7 @@ import org.apache.metamodel.schema.MutableSchema;
 import org.apache.metamodel.schema.MutableTable;
 import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
 import org.elasticsearch.client.IndicesAdminClient;
@@ -50,13 +51,16 @@ final class ElasticSearchCreateTableBuilder extends AbstractTableCreationBuilder
         final IndicesAdminClient indicesAdmin = dataContext.getElasticSearchClient().admin().indices();
         final String indexName = dataContext.getIndexName();
 
-        final PutMappingRequestBuilder requestBuilder = new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName)
-                .setType(table.getName());
+        final PutMappingRequestBuilder requestBuilder =
+                new PutMappingRequestBuilder(indicesAdmin, PutMappingAction.INSTANCE).setIndices(indexName)
+                        .setType(table.getName());
         requestBuilder.setSource(source);
         final PutMappingResponse result = requestBuilder.execute().actionGet();
 
         logger.debug("PutMapping response: acknowledged={}", result.isAcknowledged());
 
+        dataContext.getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+
         final MutableSchema schema = (MutableSchema) getSchema();
         schema.addTable(table);
         return table;

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
index d2dfe4b..3df0ce1 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContext.java
@@ -18,39 +18,30 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
-import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.metamodel.DataContext;
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
 import org.apache.metamodel.UpdateScript;
 import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
 import org.apache.metamodel.data.DataSet;
 import org.apache.metamodel.data.DataSetHeader;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
 import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
 import org.apache.metamodel.util.SimpleTableDef;
-import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.count.CountResponse;
 import org.elasticsearch.action.get.GetResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
@@ -59,14 +50,16 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.common.hppc.ObjectLookupContainer;
-import org.elasticsearch.common.hppc.cursors.ObjectCursor;
-import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.carrotsearch.hppc.ObjectLookupContainer;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+
 /**
  * DataContext implementation for ElasticSearch analytics engine.
  *
@@ -84,20 +77,11 @@ import org.slf4j.LoggerFactory;
  * This implementation supports either automatic discovery of a schema or manual
  * specification of a schema, through the {@link SimpleTableDef} class.
  */
-public class ElasticSearchDataContext extends QueryPostprocessDataContext implements DataContext, UpdateableDataContext {
+public class ElasticSearchDataContext extends AbstractElasticSearchDataContext {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
 
-    public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
-
     private final Client elasticSearchClient;
-    private final String indexName;
-    // Table definitions that are set from the beginning, not supposed to be
-    // changed.
-    private final List<SimpleTableDef> staticTableDefinitions;
-
-    // Table definitions that are discovered, these can change
-    private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
 
     /**
      * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
@@ -113,16 +97,12 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
      *            and column model of the ElasticSearch index.
      */
     public ElasticSearchDataContext(Client client, String indexName, SimpleTableDef... tableDefinitions) {
-        super(false);
+        super(indexName, tableDefinitions);
+
         if (client == null) {
             throw new IllegalArgumentException("ElasticSearch Client cannot be null");
         }
-        if (indexName == null || indexName.trim().length() == 0) {
-            throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
-        }
         this.elasticSearchClient = client;
-        this.indexName = indexName;
-        this.staticTableDefinitions = Arrays.asList(tableDefinitions);
         this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
     }
 
@@ -140,40 +120,14 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         this(client, indexName, new SimpleTableDef[0]);
     }
 
-    /**
-     * Performs an analysis of the available indexes in an ElasticSearch cluster
-     * {@link Client} instance and detects the elasticsearch types structure
-     * based on the metadata provided by the ElasticSearch java client.
-     *
-     * @see {@link #detectTable(ClusterState, String, String)}
-     * @return a mutable schema instance, useful for further fine tuning by the
-     *         user.
-     */
-    private SimpleTableDef[] detectSchema() {
+    @Override
+    protected SimpleTableDef[] detectSchema() {
         logger.info("Detecting schema for index '{}'", indexName);
 
-        final ClusterState cs;
         final ClusterStateRequestBuilder clusterStateRequestBuilder = getElasticSearchClient().admin().cluster()
-                .prepareState();
-
-        // different methods here to set the index name, so we have to use
-        // reflection :-/
-        try {
-            final byte majorVersion = Version.CURRENT.major;
-            final Object methodArgument = new String[] { indexName };
-            if (majorVersion == 0) {
-                final Method method = ClusterStateRequestBuilder.class.getMethod("setFilterIndices", String[].class);
-                method.invoke(clusterStateRequestBuilder, methodArgument);
-            } else {
-                final Method method = ClusterStateRequestBuilder.class.getMethod("setIndices", String[].class);
-                method.invoke(clusterStateRequestBuilder, methodArgument);
-            }
-        } catch (Exception e) {
-            logger.error("Failed to set index name on ClusterStateRequestBuilder, version {}", Version.CURRENT, e);
-            throw new MetaModelException("Failed to create request for index information needed to detect schema", e);
-        }
-        cs = clusterStateRequestBuilder.execute().actionGet().getState();
-
+                .prepareState().setIndices(indexName);
+        final ClusterState cs = clusterStateRequestBuilder.execute().actionGet().getState();
+        
         final List<SimpleTableDef> result = new ArrayList<>();
 
         final IndexMetaData imd = cs.getMetaData().index(indexName);
@@ -183,9 +137,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         } else {
             final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
             final ObjectLookupContainer<String> documentTypes = mappings.keys();
-
-            for (final Object documentTypeCursor : documentTypes) {
-                final String documentType = ((ObjectCursor<?>) documentTypeCursor).value.toString();
+            for (final ObjectCursor<?> documentTypeCursor : documentTypes) {
+                final String documentType = documentTypeCursor.value.toString();
                 try {
                     final SimpleTableDef table = detectTable(cs, indexName, documentType);
                     result.add(table);
@@ -194,15 +147,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
                 }
             }
         }
-        final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
-        Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
-            @Override
-            public int compare(SimpleTableDef o1, SimpleTableDef o2) {
-                return o1.getName().compareTo(o2.getName());
-            }
-        });
-
-        return tableDefArray;
+        return sortTables(result);
     }
 
     /**
@@ -225,7 +170,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
             // index does not exist
             throw new IllegalArgumentException("No such index: " + indexName);
         }
-        final MappingMetaData mappingMetaData = imd.mapping(documentType);
+        final ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
+        final MappingMetaData mappingMetaData = mappings.get(documentType);
         if (mappingMetaData == null) {
             throw new IllegalArgumentException("No such document type in index '" + indexName + "': " + documentType);
         }
@@ -244,44 +190,6 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
     }
 
     @Override
-    protected Schema getMainSchema() throws MetaModelException {
-        final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
-        for (final SimpleTableDef tableDef : staticTableDefinitions) {
-            addTable(theSchema, tableDef);
-        }
-
-        final SimpleTableDef[] tables = detectSchema();
-        synchronized (this) {
-            dynamicTableDefinitions.clear();
-            dynamicTableDefinitions.addAll(Arrays.asList(tables));
-            for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
-                final List<String> tableNames = theSchema.getTableNames();
-
-                if (!tableNames.contains(tableDef.getName())) {
-                    addTable(theSchema, tableDef);
-                }
-            }
-        }
-
-        return theSchema;
-    }
-
-    private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
-        final MutableTable table = tableDef.toTable().setSchema(theSchema);
-        final Column idColumn = table.getColumnByName(ElasticSearchUtils.FIELD_ID);
-        if (idColumn != null && idColumn instanceof MutableColumn) {
-            final MutableColumn mutableColumn = (MutableColumn) idColumn;
-            mutableColumn.setPrimaryKey(true);
-        }
-        theSchema.addTable(table);
-    }
-
-    @Override
-    protected String getMainSchemaName() throws MetaModelException {
-        return indexName;
-    }
-
-    @Override
     protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
             List<FilterItem> whereItems, int firstRow, int maxRows) {
         final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
@@ -290,7 +198,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
             // where clause can be pushed down to an ElasticSearch query
             final SearchRequestBuilder searchRequest = createSearchRequest(table, firstRow, maxRows, queryBuilder);
             final SearchResponse response = searchRequest.execute().actionGet();
-            return new ElasticSearchDataSet(elasticSearchClient, response, selectItems, false);
+            return new ElasticSearchDataSet(getElasticSearchClient(), response, selectItems);
         }
         return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
     }
@@ -299,12 +207,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
     protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
         final SearchRequestBuilder searchRequest = createSearchRequest(table, 1, maxRows, null);
         final SearchResponse response = searchRequest.execute().actionGet();
-        return new ElasticSearchDataSet(elasticSearchClient, response, columns.stream().map(SelectItem::new).collect(Collectors.toList()), false);
+        return new ElasticSearchDataSet(getElasticSearchClient(), response, columns.stream().map(SelectItem::new)
+                .collect(Collectors.toList()));
     }
 
     private SearchRequestBuilder createSearchRequest(Table table, int firstRow, int maxRows, QueryBuilder queryBuilder) {
         final String documentType = table.getName();
-        final SearchRequestBuilder searchRequest = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
+        final SearchRequestBuilder searchRequest = getElasticSearchClient().prepareSearch(indexName).setTypes(documentType);
         if (firstRow > 1) {
             final int zeroBasedFrom = firstRow - 1;
             searchRequest.setFrom(zeroBasedFrom);
@@ -332,7 +241,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         final String documentType = table.getName();
         final String id = keyValue.toString();
 
-        final GetResponse response = elasticSearchClient.prepareGet(indexName, documentType, id).execute().actionGet();
+        final GetResponse response = getElasticSearchClient().prepareGet(indexName, documentType, id).execute().actionGet();
 
         if (!response.isExists()) {
             return null;
@@ -343,7 +252,7 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
 
         final DataSetHeader header = new SimpleDataSetHeader(selectItems);
 
-        return NativeElasticSearchUtils.createRow(source, documentId, header);
+        return ElasticSearchUtils.createRow(source, documentId, header);
     }
 
     @Override
@@ -353,13 +262,11 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
             return null;
         }
         final String documentType = table.getName();
-        final CountResponse response = elasticSearchClient.prepareCount(indexName)
-                .setQuery(QueryBuilders.termQuery("_type", documentType)).execute().actionGet();
-        return response.getCount();
-    }
-
-    private boolean limitMaxRowsIsSet(int maxRows) {
-        return (maxRows != -1);
+        final TermQueryBuilder query = QueryBuilders.termQuery("_type", documentType);
+        final SearchResponse searchResponse =
+                getElasticSearchClient().prepareSearch(indexName).setSource(new SearchSourceBuilder().size(0).query(query))
+                        .execute().actionGet();
+        return searchResponse.getHits().getTotalHits();
     }
 
     @Override
@@ -370,6 +277,13 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         return callback.getUpdateSummary();
     }
 
+    @Override
+    protected void onSchemaCacheRefreshed() {
+        getElasticSearchClient().admin().indices().prepareRefresh(indexName).get();
+        
+        detectSchema();
+    }
+
     /**
      * Gets the {@link Client} that this {@link DataContext} is wrapping.
      *
@@ -378,13 +292,4 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
     public Client getElasticSearchClient() {
         return elasticSearchClient;
     }
-
-    /**
-     * Gets the name of the index that this {@link DataContext} is working on.
-     *
-     * @return
-     */
-    public String getIndexName() {
-        return indexName;
-    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
index 94359c4..a6f6953 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextFactory.java
@@ -18,6 +18,9 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import org.apache.metamodel.ConnectionException;
 import org.apache.metamodel.DataContext;
 import org.apache.metamodel.factory.DataContextFactory;
@@ -27,12 +30,11 @@ import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
 import org.apache.metamodel.util.SimpleTableDef;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.common.settings.ImmutableSettings.Builder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.node.Node;
-import org.elasticsearch.node.NodeBuilder;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Factory for ElasticSearch data context of native type.
@@ -59,6 +61,7 @@ import org.elasticsearch.node.NodeBuilder;
  * </ul>
  */
 public class ElasticSearchDataContextFactory implements DataContextFactory {
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContextFactory.class);
 
     @Override
     public boolean accepts(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry) {
@@ -120,46 +123,22 @@ public class ElasticSearchDataContextFactory implements DataContextFactory {
     @Override
     public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
             throws UnsupportedDataContextPropertiesException, ConnectionException {
-        final String clientType = getClientType(properties);
         final Client client;
-        if ("node".equals(clientType)) {
-            client = createNodeClient(properties);
-        } else {
             client = createTransportClient(properties);
-        }
         final String indexName = getIndex(properties);
         final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
         return new ElasticSearchDataContext(client, indexName, tableDefinitions);
     }
 
     private Client createTransportClient(DataContextProperties properties) {
-        final Builder settingsBuilder = ImmutableSettings.builder();
-        settingsBuilder.put("name", "MetaModel");
-        settingsBuilder.put("cluster.name", getCluster(properties));
-        if (properties.getUsername() != null && properties.getPassword() != null) {
-            settingsBuilder.put("shield.user", properties.getUsername() + ":" + properties.getPassword());
-            if ("true".equals(properties.toMap().get("ssl"))) {
-                if (properties.toMap().get("keystorePath") != null) {
-                    settingsBuilder.put("shield.ssl.keystore.path", properties.toMap().get("keystorePath"));
-                    settingsBuilder.put("shield.ssl.keystore.password", properties.toMap().get("keystorePassword"));
-                }
-                settingsBuilder.put("shield.transport.ssl", "true");
-            }
+        final Settings settings = Settings.builder().put().put("name", "MetaModel").put("cluster.name", getCluster(properties)).build();
+        final TransportClient client = new PreBuiltTransportClient(settings);
+        try {
+            client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(properties.getHostname()), properties.getPort()));
+        } catch (UnknownHostException e) {
+            logger.warn("no IP address for the host with name \"{}\" could be found.", properties.getHostname());
         }
-        final Settings settings = settingsBuilder.build();
-
-        final TransportClient client = new TransportClient(settings);
-        client.addTransportAddress(new InetSocketTransportAddress(properties.getHostname(), properties.getPort()));
         return client;
     }
 
-    private Client createNodeClient(DataContextProperties properties) {
-        final Builder settingsBuilder = ImmutableSettings.builder();
-        settingsBuilder.put("name", "MetaModel");
-        settingsBuilder.put("shield.enabled", false);
-        final Settings settings = settingsBuilder.build();
-        final Node node = NodeBuilder.nodeBuilder().clusterName(getCluster(properties)).client(true).settings(settings)
-                .node();
-        return node.client();
-    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
index 4ced2c8..b616eb2 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataSet.java
@@ -19,104 +19,38 @@
 package org.apache.metamodel.elasticsearch.nativeclient;
 
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.metamodel.data.AbstractDataSet;
 import org.apache.metamodel.data.DataSet;
-import org.apache.metamodel.data.Row;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
 import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollAction;
 import org.elasticsearch.action.search.ClearScrollRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.search.SearchHit;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * {@link DataSet} implementation for ElasticSearch
  */
-final class ElasticSearchDataSet extends AbstractDataSet {
-
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class);
+final class ElasticSearchDataSet extends AbstractElasticSearchDataSet {
 
     private final Client _client;
-    private final AtomicBoolean _closed;
-
-    private SearchResponse _searchResponse;
-    private SearchHit _currentHit;
-    private int _hitIndex = 0;
 
-    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, List<SelectItem> selectItems,
-            boolean queryPostProcessed) {
-        super(selectItems);
+    public ElasticSearchDataSet(final Client client, final SearchResponse searchResponse,
+            final List<SelectItem> selectItems) {
+        super(searchResponse, selectItems);
         _client = client;
-        _searchResponse = searchResponse;
-        _closed = new AtomicBoolean(false);
     }
 
-
     @Override
-    public void close() {
-        super.close();
-        boolean closeNow = _closed.compareAndSet(true, false);
-        if (closeNow) {
-            ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client)
-                    .addScrollId(_searchResponse.getScrollId());
-            scrollRequestBuilder.execute();
-        }
+    public void closeNow() {
+        ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client,
+                ClearScrollAction.INSTANCE).addScrollId(_searchResponse.getScrollId());
+        scrollRequestBuilder.execute();
     }
 
     @Override
-    protected void finalize() throws Throwable {
-        super.finalize();
-        if (!_closed.get()) {
-            logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
-            close();
-        }
-    }
-
-    @Override
-    public boolean next() {
-        final SearchHit[] hits = _searchResponse.getHits().hits();
-        if (hits.length == 0) {
-            // break condition for the scroll
-            _currentHit = null;
-            return false;
-        }
-
-        if (_hitIndex < hits.length) {
-            // pick the next hit within this search response
-            _currentHit = hits[_hitIndex];
-            _hitIndex++;
-            return true;
-        }
-
-        final String scrollId = _searchResponse.getScrollId();
-        if (scrollId == null) {
-            // this search response is not scrolleable - then it's the end.
-            _currentHit = null;
-            return false;
-        }
-
-        // try to scroll to the next set of hits
-        _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+    protected SearchResponse scrollSearchResponse(final String scrollId) {
+        return _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
                 .execute().actionGet();
-
-        // start over (recursively)
-        _hitIndex = 0;
-        return next();
-    }
-
-    @Override
-    public Row getRow() {
-        if (_currentHit == null) {
-            return null;
-        }
-
-        final Map<String, Object> source = _currentHit.getSource();
-        final String documentId = _currentHit.getId();
-        final Row row = NativeElasticSearchUtils.createRow(source, documentId, getHeader());
-        return row;
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
index 0de2a71..2db8e8c 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDeleteBuilder.java
@@ -18,6 +18,7 @@
  */
 package org.apache.metamodel.elasticsearch.nativeclient;
 
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.metamodel.MetaModelException;
@@ -27,9 +28,11 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.query.FilterItem;
 import org.apache.metamodel.query.LogicalOperator;
 import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.deletebyquery.DeleteByQueryRequestBuilder;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,10 +60,6 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
         final Client client = dataContext.getElasticSearchClient();
         final String indexName = dataContext.getIndexName();
 
-        final DeleteByQueryRequestBuilder deleteByQueryRequestBuilder = new DeleteByQueryRequestBuilder(client);
-        deleteByQueryRequestBuilder.setIndices(indexName);
-        deleteByQueryRequestBuilder.setTypes(documentType);
-
         final List<FilterItem> whereItems = getWhereItems();
 
         // delete by query - note that creteQueryBuilderForSimpleWhere may
@@ -74,9 +73,21 @@ final class ElasticSearchDeleteBuilder extends AbstractRowDeletionBuilder {
             throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
                     + whereItems);
         }
-        deleteByQueryRequestBuilder.setQuery(queryBuilder);
-        deleteByQueryRequestBuilder.execute().actionGet();
 
-        logger.debug("Deleted documents by query.");
+        final SearchResponse response =
+                client.prepareSearch(indexName).setQuery(queryBuilder).setTypes(documentType).execute()
+                        .actionGet();
+
+        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+        final Iterator<SearchHit> iterator = response.getHits().iterator();
+        while (iterator.hasNext()) {
+            final SearchHit hit = iterator.next();
+            final String typeId = hit.getId();
+            final DeleteResponse deleteResponse =
+                    client.prepareDelete().setIndex(indexName).setType(documentType).setId(typeId).execute()
+                            .actionGet();
+            logger.debug("Deleted documents by query." + deleteResponse.getResult());
+        }
+        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
     }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
deleted file mode 100644
index d66b240..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDropTableBuilder.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.lang.reflect.Method;
-
-import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.drop.AbstractTableDropBuilder;
-import org.apache.metamodel.drop.TableDropBuilder;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.Table;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link TableDropBuilder} for dropping tables (document types) in an
- * ElasticSearch index.
- */
-final class ElasticSearchDropTableBuilder extends AbstractTableDropBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDropTableBuilder.class);
-
-    private final ElasticSearchUpdateCallback _updateCallback;
-
-    public ElasticSearchDropTableBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
-        super(table);
-        _updateCallback = updateCallback;
-    }
-
-    @Override
-    public void execute() throws MetaModelException {
-        final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
-        final Table table = getTable();
-        final String documentType = table.getName();
-        logger.info("Deleting mapping / document type: {}", documentType);
-        final Client client = dataContext.getElasticSearchClient();
-        final IndicesAdminClient indicesAdminClient = client.admin().indices();
-        final String indexName = dataContext.getIndexName();
-
-        final DeleteMappingRequestBuilder requestBuilder = new DeleteMappingRequestBuilder(indicesAdminClient)
-                .setIndices(indexName);
-        setType(requestBuilder, documentType);
-
-        final DeleteMappingResponse result = requestBuilder.execute().actionGet();
-        logger.debug("Delete mapping response: acknowledged={}", result.isAcknowledged());
-
-        final MutableSchema schema = (MutableSchema) table.getSchema();
-        schema.removeTable(table);
-    }
-
-    /**
-     * Invokes the {@link DeleteMappingRequestBuilder#setType(String...)} method
-     * using reflection. This is done because the API of ElasticSearch was
-     * changed and the method signature differes between different versions.
-     * 
-     * @param requestBuilder
-     * @param documentType
-     */
-    private void setType(DeleteMappingRequestBuilder requestBuilder, String documentType) {
-        Object argument;
-        Method method;
-        try {
-            try {
-                method = requestBuilder.getClass().getDeclaredMethod("setType", String[].class);
-                argument = new String[] {documentType};
-            } catch (NoSuchMethodException e) {
-                logger.debug("No setType(String[]) method found, trying with a single String instead", e);
-                method = requestBuilder.getClass().getDeclaredMethod("setType", String.class);
-                argument = documentType;
-            }
-        } catch (Exception e) {
-            logger.error("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
-            throw new IllegalStateException("Failed to resolve DeleteMappingRequestBuilder.setType(...) method", e);
-        }
-        try {
-            method.setAccessible(true);
-            method.invoke(requestBuilder, argument);
-        } catch (Exception e) {
-            logger.error("Failed to invoke {}", method, e);
-            throw new IllegalStateException("Failed to invoke " + method, e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
index 70d31b4..a1c7f69 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchInsertBuilder.java
@@ -26,6 +26,7 @@ import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
 import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
 import org.apache.metamodel.schema.Column;
 import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexAction;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.client.Client;
@@ -46,7 +47,8 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast
         final Client client = dataContext.getElasticSearchClient();
         final String indexName = dataContext.getIndexName();
         final String documentType = getTable().getName();
-        final IndexRequestBuilder requestBuilder = new IndexRequestBuilder(client, indexName).setType(documentType);
+        final IndexRequestBuilder requestBuilder =
+                new IndexRequestBuilder(client, IndexAction.INSTANCE).setIndex(indexName).setType(documentType);
 
         final Map<String, Object> valueMap = new HashMap<>();
         final Column[] columns = getColumns();
@@ -68,11 +70,11 @@ final class ElasticSearchInsertBuilder extends AbstractRowInsertionBuilder<Elast
         assert !valueMap.isEmpty();
 
         requestBuilder.setSource(valueMap);
-        requestBuilder.setCreate(true);
 
         final IndexResponse result = requestBuilder.execute().actionGet();
         
         logger.debug("Inserted document: id={}", result.getId());
-    }
 
+        client.admin().indices().prepareRefresh(indexName).execute().actionGet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
deleted file mode 100644
index c0a1232..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParser.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Parser that transforms the ElasticSearch metadata response (json-like format)
- * into an ElasticSearchMetaData object.
- */
-public class ElasticSearchMetaDataParser {
-
-    /**
-     * Parses the ElasticSearch meta data info into an ElasticSearchMetaData
-     * object. This method makes much easier to create the ElasticSearch schema.
-     *
-     * @param metaDataInfo
-     *            ElasticSearch mapping metadata in Map format
-     * @return An ElasticSearchMetaData object
-     */
-    public static ElasticSearchMetaData parse(Map<String, ?> metaDataInfo) {
-        final String[] fieldNames = new String[metaDataInfo.size() + 1];
-        final ColumnType[] columnTypes = new ColumnType[metaDataInfo.size() + 1];
-
-        // add the document ID field (fixed)
-        fieldNames[0] = ElasticSearchUtils.FIELD_ID;
-        columnTypes[0] = ColumnType.STRING;
-
-        int i = 1;
-        for (Entry<String, ?> metaDataField : metaDataInfo.entrySet()) {
-            @SuppressWarnings("unchecked")
-            final Map<String, ?> fieldMetadata = (Map<String, ?>) metaDataField.getValue();
-
-            fieldNames[i] = metaDataField.getKey();
-            columnTypes[i] = getColumnTypeFromMetadataField(fieldMetadata);
-            i++;
-
-        }
-        return new ElasticSearchMetaData(fieldNames, columnTypes);
-    }
-
-    private static ColumnType getColumnTypeFromMetadataField(Map<String, ?> fieldMetadata) {
-        final String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(fieldMetadata);
-
-        if (metaDataFieldType == null) {
-            return ColumnType.STRING;
-        }
-
-        return ElasticSearchUtils.getColumnTypeFromElasticSearchType(metaDataFieldType);
-    }
-
-    private static String getMetaDataFieldTypeFromMetaDataField(Map<String, ?> metaDataField) {
-        final Object type = metaDataField.get("type");
-        if (type == null) {
-            return null;
-        }
-        return type.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
new file mode 100644
index 0000000..eeee6fc
--- /dev/null
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateBuilder.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.nativeclient;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.AbstractRowUpdationBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.update.UpdateAction;
+import org.elasticsearch.action.update.UpdateRequestBuilder;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchUpdateBuilder extends AbstractRowUpdationBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(ElasticSearchUpdateBuilder.class);
+
+    private final ElasticSearchUpdateCallback _updateCallback;
+
+    public ElasticSearchUpdateBuilder(ElasticSearchUpdateCallback updateCallback, Table table) {
+        super(table);
+        _updateCallback = updateCallback;
+    }
+
+    @Override
+    public void execute() throws MetaModelException {
+
+        final Table table = getTable();
+        final String documentType = table.getName();
+
+        final ElasticSearchDataContext dataContext = _updateCallback.getDataContext();
+        final Client client = dataContext.getElasticSearchClient();
+        final String indexName = dataContext.getIndexName();
+        final List<FilterItem> whereItems = getWhereItems();
+
+        // delete by query - note that creteQueryBuilderForSimpleWhere may
+        // return matchAllQuery() if no where items are present.
+        final QueryBuilder queryBuilder =
+                ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems, LogicalOperator.AND);
+        if (queryBuilder == null) {
+            // TODO: The where items could not be pushed down to a query. We
+            // could solve this by running a query first, gather all
+            // document IDs and then delete by IDs.
+            throw new UnsupportedOperationException(
+                    "Could not push down WHERE items to delete by query request: " + whereItems);
+        }
+
+        final SearchResponse response = client.prepareSearch(indexName).setQuery(queryBuilder).execute().actionGet();
+
+        final Iterator<SearchHit> iterator = response.getHits().iterator();
+        while (iterator.hasNext()) {
+            final SearchHit hit = iterator.next();
+            final String typeId = hit.getId();
+
+            final UpdateRequestBuilder requestBuilder =
+                    new UpdateRequestBuilder(client, UpdateAction.INSTANCE).setIndex(indexName).setType(documentType)
+                            .setId(typeId);
+
+            final Map<String, Object> valueMap = new HashMap<>();
+            final Column[] columns = getColumns();
+            final Object[] values = getValues();
+            for (int i = 0; i < columns.length; i++) {
+                if (isSet(columns[i])) {
+                    final String name = columns[i].getName();
+                    final Object value = values[i];
+                    if (ElasticSearchUtils.FIELD_ID.equals(name)) {
+                        if (value != null) {
+                            requestBuilder.setId(value.toString());
+                        }
+                    } else {
+                        valueMap.put(name, value);
+                    }
+                }
+            }
+
+            assert !valueMap.isEmpty();
+
+            requestBuilder.setDoc(valueMap);
+
+            final UpdateResponse updateResponse = requestBuilder.execute().actionGet();
+
+            logger.debug("Update document: id={}", updateResponse.getId());
+
+            client.admin().indices().prepareRefresh(indexName).get();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
index b81c9c7..c4cbbac 100644
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
+++ b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUpdateCallback.java
@@ -26,6 +26,7 @@ import org.apache.metamodel.drop.TableDropBuilder;
 import org.apache.metamodel.insert.RowInsertionBuilder;
 import org.apache.metamodel.schema.Schema;
 import org.apache.metamodel.schema.Table;
+import org.apache.metamodel.update.RowUpdationBuilder;
 import org.elasticsearch.client.Client;
 
 /**
@@ -50,13 +51,13 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
 
     @Override
     public boolean isDropTableSupported() {
-        return true;
+        return false;
     }
 
     @Override
     public TableDropBuilder dropTable(Table table) throws IllegalArgumentException, IllegalStateException,
             UnsupportedOperationException {
-        return new ElasticSearchDropTableBuilder(this, table);
+        throw new UnsupportedOperationException();
     }
 
     @Override
@@ -76,6 +77,11 @@ final class ElasticSearchUpdateCallback extends AbstractUpdateCallback {
         return new ElasticSearchDeleteBuilder(this, table);
     }
 
+    @Override
+    public RowUpdationBuilder update(final Table table) {
+        return new ElasticSearchUpdateBuilder(this, table);
+    }
+
     public void onExecuteUpdateFinished() {
         // force refresh of the index
         final ElasticSearchDataContext dataContext = getDataContext();

http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java b/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
deleted file mode 100644
index 822ef1b..0000000
--- a/elasticsearch/native/src/main/java/org/apache/metamodel/elasticsearch/nativeclient/NativeElasticSearchUtils.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.DefaultRow;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.elasticsearch.common.ElasticSearchDateConverter;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.ColumnType;
-
-/**
- * Shared/common util functions for the ElasticSearch MetaModel module.
- */
-final class NativeElasticSearchUtils {
-
-    public static Row createRow(Map<String, Object> sourceMap, String documentId, DataSetHeader header) {
-        final Object[] values = new Object[header.size()];
-        for (int i = 0; i < values.length; i++) {
-            final SelectItem selectItem = header.getSelectItem(i);
-            final Column column = selectItem.getColumn();
-
-            assert column != null;
-            assert selectItem.getAggregateFunction() == null;
-            assert selectItem.getScalarFunction() == null;
-
-            if (column.isPrimaryKey()) {
-                values[i] = documentId;
-            } else {
-                Object value = sourceMap.get(column.getName());
-
-                if (column.getType() == ColumnType.DATE) {
-                    Date valueToDate = ElasticSearchDateConverter.tryToConvert((String) value);
-                    if (valueToDate == null) {
-                        values[i] = value;
-                    } else {
-                        values[i] = valueToDate;
-                    }
-                } else {
-                    values[i] = value;
-                }
-            }
-        }
-
-        return new DefaultRow(header, values);
-    }
-}