You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2018/01/26 04:13:31 UTC
[3/4] metamodel git commit: METAMODEL-1179: Upgraded ElasticSearch
REST module to new client.
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
index ec5ecba..983ba5e 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchDataContextTest.java
@@ -19,12 +19,6 @@
package org.apache.metamodel.elasticsearch.nativeclient;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Arrays;
@@ -45,9 +39,7 @@ import org.apache.metamodel.data.DataSetTableModel;
import org.apache.metamodel.data.InMemoryDataSet;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.delete.DeleteFrom;
-import org.apache.metamodel.drop.DropTable;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
-import org.apache.metamodel.elasticsearch.nativeclient.utils.EmbeddedElasticsearchServer;
import org.apache.metamodel.query.FunctionType;
import org.apache.metamodel.query.Query;
import org.apache.metamodel.query.SelectItem;
@@ -58,20 +50,18 @@ import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.update.Update;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.client.Client;
-import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.test.ESSingleNodeTestCase;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-public class ElasticSearchDataContextTest {
+public class ElasticSearchDataContextTest extends ESSingleNodeTestCase {
private static final String indexName = "twitter";
private static final String indexType1 = "tweet1";
@@ -81,14 +71,15 @@ public class ElasticSearchDataContextTest {
private static final String bulkIndexType = "bulktype";
private static final String peopleIndexType = "peopletype";
private static final String mapping = "{\"date_detection\":\"false\",\"properties\":{\"message\":{\"type\":\"string\",\"index\":\"not_analyzed\",\"doc_values\":\"true\"}}}";
- private static EmbeddedElasticsearchServer embeddedElasticsearchServer;
- private static Client client;
- private static UpdateableDataContext dataContext;
-
- @BeforeClass
- public static void beforeTests() throws Exception {
- embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
- client = embeddedElasticsearchServer.getClient();
+ private Client client;
+ private UpdateableDataContext dataContext;
+
+ @Before
+ public void beforeTests() throws Exception {
+ client = client();
+
+ dataContext = new ElasticSearchDataContext(client, indexName);
+
indexTweeterDocument(indexType1, 1);
indexTweeterDocument(indexType2, 1);
indexTweeterDocument(indexType2, 2, null);
@@ -96,15 +87,10 @@ public class ElasticSearchDataContextTest {
indexTweeterDocument(indexType2, 1);
indexBulkDocuments(indexName, bulkIndexType, 10);
- // The refresh API allows to explicitly refresh one or more index,
- // making all operations performed since the last refresh available for
- // search
- embeddedElasticsearchServer.getClient().admin().indices().prepareRefresh().execute().actionGet();
- dataContext = new ElasticSearchDataContext(client, indexName);
- System.out.println("Embedded ElasticSearch server created!");
+ dataContext.refreshSchemas();
}
- private static void insertPeopleDocuments() throws IOException {
+ private void insertPeopleDocuments() throws IOException {
indexOnePeopleDocument("female", 20, 5);
indexOnePeopleDocument("female", 17, 8);
indexOnePeopleDocument("female", 18, 9);
@@ -116,10 +102,9 @@ public class ElasticSearchDataContextTest {
indexOnePeopleDocument("male", 18, 4);
}
- @AfterClass
- public static void afterTests() {
- embeddedElasticsearchServer.shutdown();
- System.out.println("Embedded ElasticSearch server shut down!");
+ @After
+ public void afterTests() {
+ client.admin().indices().delete(new DeleteIndexRequest("_all")).actionGet();
}
@Test
@@ -128,9 +113,15 @@ public class ElasticSearchDataContextTest {
Arrays.toString(dataContext.getDefaultSchema().getTableNames().toArray()));
Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+ try (DataSet ds = dataContext.query().from(indexType1).select("_id").execute()) {
+ assertEquals(ElasticSearchDataSet.class, ds.getClass());
+ assertTrue(ds.next());
+ assertEquals("Row[values=[tweet_tweet1_1]]",ds.getRow().toString());
+ }
assertEquals("[_id, message, postDate, user]", Arrays.toString(table.getColumnNames().toArray()));
+ assertEquals(ColumnType.STRING, table.getColumnByName("_id").getType());
assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
@@ -201,7 +192,7 @@ public class ElasticSearchDataContextTest {
}
@Test
- public void testCreateTableInsertQueryAndDrop() throws Exception {
+ public void testCreateTableAndInsertQuery() throws Exception {
final Schema schema = dataContext.getDefaultSchema();
final CreateTable createTable = new CreateTable(schema, "testCreateTable");
createTable.withColumn("foo").ofType(ColumnType.STRING);
@@ -235,42 +226,42 @@ public class ElasticSearchDataContextTest {
assertNotNull(ds.getRow().getValue(idColumn));
assertFalse(ds.next());
}
-
- dataContext.executeUpdate(new DropTable(table));
-
- dataContext.refreshSchemas();
-
- assertNull(dataContext.getTableByQualifiedLabel(table.getName()));
}
@Test
- public void testDetectOutsideChanges() throws Exception {
- ElasticSearchDataContext elasticSearchDataContext = (ElasticSearchDataContext) dataContext;
+ public void testDeleteFromWithWhere() throws Exception {
+ final Schema schema = dataContext.getDefaultSchema();
+ final String tableName = "testCreateTableDelete";
+ final CreateTable createTable = new CreateTable(schema, tableName);
+ createTable.withColumn("foo").ofType(ColumnType.STRING);
+ createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
+ dataContext.executeUpdate(createTable);
- // Create the type in ES
- final IndicesAdminClient indicesAdmin = elasticSearchDataContext.getElasticSearchClient().admin().indices();
- final String tableType = "outsideTable";
+ final Table table = schema.getTableByName(tableName);
- Object[] sourceProperties = { "testA", "type=string, store=true", "testB", "type=string, store=true" };
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+ }
+ });
- new PutMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).setSource(sourceProperties)
- .execute().actionGet();
+ dataContext.executeUpdate(new DeleteFrom(table).where("bar").eq(42));
- dataContext.refreshSchemas();
+ final Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
+ .toQuery());
- assertNotNull(dataContext.getDefaultSchema().getTableByName(tableType));
+ assertEquals("Row[values=[1]]", row.toString());
- new DeleteMappingRequestBuilder(indicesAdmin).setIndices(indexName).setType(tableType).execute().actionGet();
- dataContext.refreshSchemas();
- assertNull(dataContext.getTableByQualifiedLabel(tableType));
}
@Test
- public void testDeleteAll() throws Exception {
+ public void testDeleteNoWhere() throws Exception {
final Schema schema = dataContext.getDefaultSchema();
final CreateTable createTable = new CreateTable(schema, "testCreateTable");
createTable.withColumn("foo").ofType(ColumnType.STRING);
- createTable.withColumn("bar").ofType(ColumnType.NUMBER);
+ createTable.withColumn("bar").ofType(ColumnType.DOUBLE);
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
@@ -288,8 +279,6 @@ public class ElasticSearchDataContextTest {
Row row = MetaModelHelper.executeSingleRowQuery(dataContext, dataContext.query().from(table).selectCount()
.toQuery());
assertEquals("Row[values=[0]]", row.toString());
-
- dataContext.executeUpdate(new DropTable(table));
}
@Test
@@ -315,8 +304,6 @@ public class ElasticSearchDataContextTest {
Row row = MetaModelHelper.executeSingleRowQuery(dataContext,
dataContext.query().from(table).select("foo", "bar").toQuery());
assertEquals("Row[values=[world, 43]]", row.toString());
-
- dataContext.executeUpdate(new DropTable(table));
}
@Test
@@ -328,27 +315,22 @@ public class ElasticSearchDataContextTest {
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
- try {
- dataContext.executeUpdate(new UpdateScript() {
- @Override
- public void run(UpdateCallback callback) {
- callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
- callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
- }
- });
-
- // greater than is not yet supported
- try {
- dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
- fail("Exception expected");
- } catch (UnsupportedOperationException e) {
- assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
- e.getMessage());
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
}
+ });
- } finally {
- dataContext.executeUpdate(new DropTable(table));
+ // greater than is not yet supported
+ try {
+ dataContext.executeUpdate(new DeleteFrom(table).where("bar").gt(40));
+ fail("Exception expected");
+ } catch (UnsupportedOperationException e) {
+ assertEquals("Could not push down WHERE items to delete by query request: [testCreateTable.bar > 40]",
+ e.getMessage());
}
}
@@ -361,54 +343,24 @@ public class ElasticSearchDataContextTest {
dataContext.executeUpdate(createTable);
final Table table = schema.getTableByName("testCreateTable");
- try {
- dataContext.executeUpdate(new UpdateScript() {
- @Override
- public void run(UpdateCallback callback) {
- callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
- callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
- }
- });
-
- dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
-
- DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
- assertTrue(dataSet.next());
- assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
- assertTrue(dataSet.next());
- assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
- assertFalse(dataSet.next());
- dataSet.close();
- } finally {
- dataContext.executeUpdate(new DropTable(table));
- }
- }
-
- @Test
- public void testDropTable() throws Exception {
- Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+ dataContext.executeUpdate(new UpdateScript() {
+ @Override
+ public void run(UpdateCallback callback) {
+ callback.insertInto(table).value("foo", "hello").value("bar", 42).execute();
+ callback.insertInto(table).value("foo", "world").value("bar", 43).execute();
+ }
+ });
- // assert that the table was there to begin with
- {
- DataSet ds = dataContext.query().from(table).selectCount().execute();
- ds.next();
- assertEquals("Row[values=[9]]", ds.getRow().toString());
- ds.close();
- }
+ dataContext.executeUpdate(new Update(table).value("foo", "howdy").where("bar").eq(42));
- dataContext.executeUpdate(new DropTable(table));
- try {
- DataSet ds = dataContext.query().from(table).selectCount().execute();
- ds.next();
- assertEquals("Row[values=[0]]", ds.getRow().toString());
- ds.close();
- } finally {
- // restore the people documents for the next tests
- insertPeopleDocuments();
- client.admin().indices().prepareRefresh().execute().actionGet();
- dataContext = new ElasticSearchDataContext(client, indexName);
- }
+ DataSet dataSet = dataContext.query().from(table).select("foo", "bar").orderBy("bar").execute();
+ assertTrue(dataSet.next());
+ assertEquals("Row[values=[howdy, 42]]", dataSet.getRow().toString());
+ assertTrue(dataSet.next());
+ assertEquals("Row[values=[world, 43]]", dataSet.getRow().toString());
+ assertFalse(dataSet.next());
+ dataSet.close();
}
@Test
@@ -549,26 +501,19 @@ public class ElasticSearchDataContextTest {
@Test
public void testNonDynamicMapingTableNames() throws Exception {
- createIndex();
+ CreateIndexRequest cir = new CreateIndexRequest(indexName2);
+ client.admin().indices().create(cir).actionGet();
+
+ PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping, XContentType.JSON);
+
+ client.admin().indices().putMapping(pmr).actionGet();
ElasticSearchDataContext dataContext2 = new ElasticSearchDataContext(client, indexName2);
assertEquals("[tweet3]", Arrays.toString(dataContext2.getDefaultSchema().getTableNames().toArray()));
}
- private static void createIndex() {
- CreateIndexRequest cir = new CreateIndexRequest(indexName2);
- CreateIndexResponse response = client.admin().indices().create(cir).actionGet();
-
- System.out.println("create index: " + response.isAcknowledged());
-
- PutMappingRequest pmr = new PutMappingRequest(indexName2).type(indexType3).source(mapping);
-
- PutMappingResponse response2 = client.admin().indices().putMapping(pmr).actionGet();
- System.out.println("put mapping: " + response2.isAcknowledged());
- }
-
- private static void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
+ private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (int i = 0; i < numberOfDocuments; i++) {
@@ -578,17 +523,17 @@ public class ElasticSearchDataContextTest {
bulkRequest.execute().actionGet();
}
- private static void indexTweeterDocument(String indexType, int id, Date date) {
- client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id, date))
- .setId("tweet_" + indexType + "_" + id).execute().actionGet();
+ private void indexTweeterDocument(String indexType, int id, Date date) {
+ final String id1 = "tweet_" + indexType + "_" + id;
+ client.prepareIndex(indexName, indexType, id1).setSource(buildTweeterJson(id, date)).execute().actionGet();
}
- private static void indexTweeterDocument(String indexType, int id) {
+ private void indexTweeterDocument(String indexType, int id) {
client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id))
.setId("tweet_" + indexType + "_" + id).execute().actionGet();
}
- private static void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
+ private void indexOnePeopleDocument(String gender, int age, int id) throws IOException {
client.prepareIndex(indexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute()
.actionGet();
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
index 8b5eb50..e08f715 100644
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
+++ b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchMetaDataParserTest.java
@@ -24,6 +24,7 @@ import java.util.Map;
import junit.framework.TestCase;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.schema.ColumnType;
import org.elasticsearch.common.collect.MapBuilder;
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
deleted file mode 100644
index 9ffc6b8..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/ElasticSearchUtilsTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient;
-
-import junit.framework.TestCase;
-import org.apache.metamodel.data.DataSetHeader;
-import org.apache.metamodel.data.Row;
-import org.apache.metamodel.data.SimpleDataSetHeader;
-import org.apache.metamodel.query.SelectItem;
-import org.apache.metamodel.schema.ColumnType;
-import org.apache.metamodel.schema.MutableColumn;
-
-import java.util.*;
-
-public class ElasticSearchUtilsTest extends TestCase {
-
- public void testAssignDocumentIdForPrimaryKeys() throws Exception {
- MutableColumn primaryKeyColumn = new MutableColumn("value1", ColumnType.STRING).setPrimaryKey(true);
- SelectItem primaryKeyItem = new SelectItem(primaryKeyColumn);
- List<SelectItem> selectItems1 = Collections.singletonList(primaryKeyItem);
- String documentId = "doc1";
- DataSetHeader header = new SimpleDataSetHeader(selectItems1);
- Map<String, Object> values = new HashMap<>();
- values.put("value1", "theValue");
- Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
- String primaryKeyValue = (String) row.getValue(primaryKeyItem);
-
- assertEquals(primaryKeyValue, documentId);
- }
-
- public void testCreateRowWithParsableDates() throws Exception {
- SelectItem item1 = new SelectItem(new MutableColumn("value1", ColumnType.STRING));
- SelectItem item2 = new SelectItem(new MutableColumn("value2", ColumnType.DATE));
- List<SelectItem> selectItems1 = Arrays.asList(item1, item2);
- String documentId = "doc1";
- DataSetHeader header = new SimpleDataSetHeader(selectItems1);
- Map<String, Object> values = new HashMap<>();
- values.put("value1", "theValue");
- values.put("value2", "2013-01-04T15:55:51.217+01:00");
- Row row = NativeElasticSearchUtils.createRow(values, documentId, header);
- Object stringValue = row.getValue(item1);
- Object dateValue = row.getValue(item2);
-
- assertTrue(stringValue instanceof String);
- assertTrue(dateValue instanceof Date);
- }
-}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java b/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
deleted file mode 100644
index b94d0ab..0000000
--- a/elasticsearch/native/src/test/java/org/apache/metamodel/elasticsearch/nativeclient/utils/EmbeddedElasticsearchServer.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.nativeclient.utils;
-
-import org.apache.commons.io.FileUtils;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.common.settings.ImmutableSettings;
-import org.elasticsearch.node.Node;
-
-import java.io.File;
-import java.io.IOException;
-
-import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
-
-public class EmbeddedElasticsearchServer {
-
- private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
-
- private final Node node;
- private final String dataDirectory;
-
- public EmbeddedElasticsearchServer() {
- this(DEFAULT_DATA_DIRECTORY);
- }
-
- public EmbeddedElasticsearchServer(String dataDirectory) {
- this.dataDirectory = dataDirectory;
-
- ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
- .put("http.enabled", "true")
- .put("path.data", dataDirectory);
-
- node = nodeBuilder()
- .local(true)
- .settings(elasticsearchSettings.build())
- .node();
- }
-
- public Client getClient() {
- return node.client();
- }
-
- public void shutdown() {
- node.close();
- deleteDataDirectory();
- }
-
- private void deleteDataDirectory() {
- try {
- FileUtils.deleteDirectory(new File(dataDirectory));
- } catch (IOException e) {
- throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
index 9a3b1d8..46f930c 100644
--- a/elasticsearch/pom.xml
+++ b/elasticsearch/pom.xml
@@ -21,7 +21,7 @@
<name>MetaModel module for Elasticsearch</name>
<properties>
- <elasticsearch.version>1.4.4</elasticsearch.version>
+ <elasticsearch.version>5.6.3</elasticsearch.version>
</properties>
<modules>
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/pom.xml b/elasticsearch/rest/pom.xml
index 936cf4c..8f556e6 100644
--- a/elasticsearch/rest/pom.xml
+++ b/elasticsearch/rest/pom.xml
@@ -26,11 +26,6 @@ under the License.
<modelVersion>4.0.0</modelVersion>
- <properties>
- <jest.version>2.0.2</jest.version>
- <elasticsearch.version>1.4.4</elasticsearch.version>
- </properties>
-
<artifactId>MetaModel-elasticsearch-rest</artifactId>
<name>MetaModel module for ElasticSearch via REST client</name>
@@ -52,34 +47,28 @@ under the License.
<artifactId>commons-io</artifactId>
</dependency>
- <!-- Jest -->
- <dependency>
- <groupId>io.searchbox</groupId>
- <artifactId>jest</artifactId>
- <version>${jest.version}</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<!-- elasticsearch -->
<dependency>
- <groupId>org.elasticsearch</groupId>
- <artifactId>elasticsearch</artifactId>
+ <groupId>org.elasticsearch.client</groupId>
+ <artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- test -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
+ <dependency><!-- required by elasticsearch -->
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
+ <version>2.9.1</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -87,5 +76,84 @@ under the License.
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>integration-test</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>2.19.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker-maven-plugin.version}</version>
+ <configuration>
+ <logDate>default</logDate>
+ <autoPull>true</autoPull>
+ <images>
+ <image>
+ <name>elasticsearch-metamodel</name>
+ <build>
+ <dockerFileDir>${project.build.directory}/test-classes</dockerFileDir>
+ </build>
+ <run>
+ <ports>
+ <port>9200:9200</port>
+ <port>9300:9300</port>
+ </ports>
+ <env>
+ <ES_JAVA_OPTS>-Xms1g -Xmx1g</ES_JAVA_OPTS>
+ <cluster.name>docker-cluster</cluster.name>
+ <bootstrap.memory_lock>true</bootstrap.memory_lock>
+ <xpack.security.enabled>false</xpack.security.enabled>
+ </env>
+ <wait>
+ <url>http://${docker.host.address}:9200</url>
+ <time>300000</time>
+ </wait>
+ </run>
+ </image>
+ </images>
+ </configuration>
+ <executions>
+ <execution>
+ <id>start</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>build</goal>
+ <goal>start</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>stop</id>
+ <phase>post-integration-test</phase>
+ <goals>
+ <goal>stop</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
new file mode 100644
index 0000000..ddd7e17
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestClient.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import static java.util.Collections.emptySet;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.http.Header;
+import org.apache.http.client.methods.HttpDelete;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.main.MainRequest;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticSearchRestClient extends RestHighLevelClient {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestClient.class);
+
+ public ElasticSearchRestClient(final RestClient restClient) {
+ super(restClient);
+ }
+
+ public final boolean refresh(final String indexName, final Header... headers) {
+ try {
+ return performRequest(new MainRequest(), request -> refresh(indexName),
+ ElasticSearchRestClient::convertResponse, emptySet(), headers);
+ } catch (IOException e) {
+ logger.info("Failed to refresh index \"{}\"", indexName, e);
+ }
+ return false;
+ }
+
+ private static Request refresh(final String indexName) {
+ return new Request(HttpPost.METHOD_NAME, "/" + indexName + "/_refresh", Collections.emptyMap(), null);
+ }
+
+ public final boolean delete(final String indexName, final Header... headers) throws IOException {
+ return performRequest(new MainRequest(), request -> delete(indexName),
+ ElasticSearchRestClient::convertResponse, emptySet(), headers);
+ }
+
+ private static Request delete(final String indexName) {
+ return new Request(HttpDelete.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+ }
+
+ public Set<Entry<String, Object>> getMappings(final String indexName, final Header... headers) throws IOException {
+ return performRequestAndParseEntity(new GetIndexRequest(), request -> getMappings(indexName), (
+ response) -> parseMappings(response, indexName), emptySet(), headers);
+ }
+
+ private static Request getMappings(final String indexName) {
+ return new Request(HttpGet.METHOD_NAME, "/" + indexName, Collections.emptyMap(), null);
+ }
+
+ public final boolean createMapping(final PutMappingRequest putMappingRequest, final Header... headers)
+ throws IOException {
+ return performRequest(putMappingRequest, request -> putMapping(putMappingRequest),
+ ElasticSearchRestClient::convertResponse, emptySet(), headers);
+ }
+
+ private static Request putMapping(final PutMappingRequest putMappingRequest) {
+ final String endpoint = "/" + putMappingRequest.indices()[0] + "/_mapping/" + putMappingRequest.type();
+ final ByteArrayEntity entity = new ByteArrayEntity(putMappingRequest.source().getBytes(),
+ ContentType.APPLICATION_JSON);
+ return new Request(HttpPut.METHOD_NAME, endpoint, Collections.emptyMap(), entity);
+ }
+
+ // Carbon copy of RestHighLevelClient#convertExistsResponse(Response) method, which is unaccessible from this class.
+ private static boolean convertResponse(final Response response) {
+ return response.getStatusLine().getStatusCode() == 200;
+ }
+
+ @SuppressWarnings("unchecked")
+ static Set<Entry<String, Object>> parseMappings(final XContentParser response, final String indexName) throws IOException {
+ Map<String, Object> schema = (Map<String, Object>) response.map().get(indexName);
+ Map<String, Object> tables = (Map<String, Object>) schema.get("mappings");
+
+ return tables.entrySet();
+ }
+
+ ActionResponse execute(final ActionRequest request) throws IOException {
+ if (request instanceof BulkRequest) {
+ return bulk((BulkRequest) request);
+ } else if (request instanceof IndexRequest) {
+ return index((IndexRequest) request);
+ } else if (request instanceof DeleteRequest) {
+ return delete((DeleteRequest) request);
+ } else if (request instanceof ClearScrollRequest) {
+ return clearScroll((ClearScrollRequest) request);
+ } else if (request instanceof SearchScrollRequest) {
+ return searchScroll((SearchScrollRequest) request);
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
new file mode 100644
index 0000000..91842f5
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestCreateTableBuilder.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.create.AbstractTableCreationBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.schema.MutableSchema;
+import org.apache.metamodel.schema.MutableTable;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+
+final class ElasticSearchRestCreateTableBuilder extends AbstractTableCreationBuilder<ElasticSearchRestUpdateCallback> {
+
+ public ElasticSearchRestCreateTableBuilder(final ElasticSearchRestUpdateCallback updateCallback,
+ final Schema schema, final String name) {
+ super(updateCallback, schema, name);
+ }
+
+ @Override
+ public Table execute() throws MetaModelException {
+ final MutableTable table = getTable();
+ final Map<String, ?> source = ElasticSearchUtils.getMappingSource(table);
+
+ final ElasticSearchRestDataContext dataContext = getUpdateCallback().getDataContext();
+ final String indexName = dataContext.getIndexName();
+
+ final PutMappingRequest putMapping = new PutMappingRequest(indexName).type(table.getName()).source(source);
+ getUpdateCallback().execute(putMapping);
+
+ final MutableSchema schema = (MutableSchema) getSchema();
+ schema.addTable(table);
+ return table;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
index c5a5696..5b32d14 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContext.java
@@ -18,57 +18,43 @@
*/
package org.apache.metamodel.elasticsearch.rest;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.metamodel.BatchUpdateScript;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.MetaModelException;
-import org.apache.metamodel.QueryPostprocessDataContext;
import org.apache.metamodel.UpdateScript;
import org.apache.metamodel.UpdateSummary;
-import org.apache.metamodel.UpdateableDataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetHeader;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.data.SimpleDataSetHeader;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaData;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchMetaDataParser;
import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
import org.apache.metamodel.query.FilterItem;
import org.apache.metamodel.query.LogicalOperator;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
-import org.apache.metamodel.schema.MutableColumn;
-import org.apache.metamodel.schema.MutableSchema;
-import org.apache.metamodel.schema.MutableTable;
-import org.apache.metamodel.schema.Schema;
import org.apache.metamodel.schema.Table;
import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import io.searchbox.core.Count;
-import io.searchbox.core.CountResult;
-import io.searchbox.core.Get;
-import io.searchbox.core.Search;
-import io.searchbox.core.SearchResult;
-import io.searchbox.indices.mapping.GetMapping;
-import io.searchbox.params.Parameters;
-
/**
* DataContext implementation for ElasticSearch analytics engine.
*
@@ -86,28 +72,14 @@ import io.searchbox.params.Parameters;
* This implementation supports either automatic discovery of a schema or manual
* specification of a schema, through the {@link SimpleTableDef} class.
*/
-public class ElasticSearchRestDataContext extends QueryPostprocessDataContext implements DataContext,
- UpdateableDataContext {
+public class ElasticSearchRestDataContext extends AbstractElasticSearchDataContext {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataContext.class);
- public static final String FIELD_ID = "_id";
-
- // 1 minute timeout
- public static final String TIMEOUT_SCROLL = "1m";
-
// we scroll when more than 400 rows are expected
private static final int SCROLL_THRESHOLD = 400;
- private final JestClient elasticSearchClient;
-
- private final String indexName;
- // Table definitions that are set from the beginning, not supposed to be
- // changed.
- private final List<SimpleTableDef> staticTableDefinitions;
-
- // Table definitions that are discovered, these can change
- private final List<SimpleTableDef> dynamicTableDefinitions = new ArrayList<>();
+ private final ElasticSearchRestClient elasticSearchClient;
/**
* Constructs a {@link ElasticSearchRestDataContext}. This constructor
@@ -122,18 +94,14 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
* an array of {@link SimpleTableDef}s, which define the table
* and column model of the ElasticSearch index.
*/
- public ElasticSearchRestDataContext(JestClient client, String indexName, SimpleTableDef... tableDefinitions) {
- super(false);
+ public ElasticSearchRestDataContext(final ElasticSearchRestClient client, final String indexName,
+ final SimpleTableDef... tableDefinitions) {
+ super(indexName, tableDefinitions);
+
if (client == null) {
throw new IllegalArgumentException("ElasticSearch Client cannot be null");
}
- if (indexName == null || indexName.trim().length() == 0) {
- throw new IllegalArgumentException("Invalid ElasticSearch Index name: " + indexName);
- }
this.elasticSearchClient = client;
- this.indexName = indexName;
- this.staticTableDefinitions = (tableDefinitions == null || tableDefinitions.length == 0 ? Collections
- .<SimpleTableDef> emptyList() : Arrays.asList(tableDefinitions));
this.dynamicTableDefinitions.addAll(Arrays.asList(detectSchema()));
}
@@ -147,65 +115,51 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
* @param indexName
* the name of the ElasticSearch index to represent
*/
- public ElasticSearchRestDataContext(JestClient client, String indexName) {
+ public ElasticSearchRestDataContext(final ElasticSearchRestClient client, String indexName) {
this(client, indexName, new SimpleTableDef[0]);
}
- /**
- * Performs an analysis of the available indexes in an ElasticSearch cluster
- * {@link JestClient} instance and detects the elasticsearch types structure
- * based on the metadata provided by the ElasticSearch java client.
- *
- * @see {@link #detectTable(JsonObject, String)}
- * @return a mutable schema instance, useful for further fine tuning by the
- * user.
- */
- private SimpleTableDef[] detectSchema() {
+ @Override
+ protected SimpleTableDef[] detectSchema() {
logger.info("Detecting schema for index '{}'", indexName);
- final JestResult jestResult;
+ final Set<Entry<String, Object>> mappings;
try {
- final GetMapping getMapping = new GetMapping.Builder().addIndex(indexName).build();
- jestResult = elasticSearchClient.execute(getMapping);
- } catch (Exception e) {
+ mappings = getElasticSearchClient().getMappings(indexName);
+ } catch (IOException e) {
logger.error("Failed to retrieve mappings", e);
throw new MetaModelException("Failed to execute request for index information needed to detect schema", e);
}
- if (!jestResult.isSucceeded()) {
- logger.error("Failed to retrieve mappings; {}", jestResult.getErrorMessage());
- throw new MetaModelException("Failed to retrieve mappings; " + jestResult.getErrorMessage());
- }
-
final List<SimpleTableDef> result = new ArrayList<>();
- final Set<Map.Entry<String, JsonElement>> mappings = jestResult.getJsonObject().getAsJsonObject(indexName)
- .getAsJsonObject("mappings").entrySet();
- if (mappings.size() == 0) {
+ if (mappings.isEmpty()) {
logger.warn("No metadata returned for index name '{}' - no tables will be detected.");
} else {
+ for (Entry<String, Object> mapping : mappings) {
+ final String documentType = mapping.getKey();
- for (Map.Entry<String, JsonElement> entry : mappings) {
- final String documentType = entry.getKey();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> mappingConfiguration = (Map<String, Object>) mapping.getValue();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> properties = (Map<String, Object>) mappingConfiguration.get("properties");
try {
- final SimpleTableDef table = detectTable(entry.getValue().getAsJsonObject().get("properties")
- .getAsJsonObject(), documentType);
+ final SimpleTableDef table = detectTable(properties, documentType);
result.add(table);
} catch (Exception e) {
logger.error("Unexpected error during detectTable for document type '{}'", documentType, e);
}
}
}
- final SimpleTableDef[] tableDefArray = result.toArray(new SimpleTableDef[result.size()]);
- Arrays.sort(tableDefArray, new Comparator<SimpleTableDef>() {
- @Override
- public int compare(SimpleTableDef o1, SimpleTableDef o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
+ return sortTables(result);
+ }
+
+ @Override
+ protected void onSchemaCacheRefreshed() {
+ getElasticSearchClient().refresh(indexName);
- return tableDefArray;
+ detectSchema();
}
/**
@@ -219,60 +173,22 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
* the name of the index type
* @return a table definition for ElasticSearch.
*/
- private static SimpleTableDef detectTable(JsonObject metadataProperties, String documentType) {
- final ElasticSearchMetaData metaData = JestElasticSearchMetaDataParser.parse(metadataProperties);
+ private static SimpleTableDef detectTable(final Map<String, Object> metadataProperties, final String documentType) {
+ final ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metadataProperties);
return new SimpleTableDef(documentType, metaData.getColumnNames(), metaData.getColumnTypes());
}
@Override
- protected Schema getMainSchema() throws MetaModelException {
- final MutableSchema theSchema = new MutableSchema(getMainSchemaName());
- for (final SimpleTableDef tableDef : staticTableDefinitions) {
- addTable(theSchema, tableDef);
- }
-
- final SimpleTableDef[] tables = detectSchema();
- synchronized (this) {
- dynamicTableDefinitions.clear();
- dynamicTableDefinitions.addAll(Arrays.asList(tables));
- for (final SimpleTableDef tableDef : dynamicTableDefinitions) {
- final List<String> tableNames = theSchema.getTableNames();
-
- if (!tableNames.contains(tableDef.getName())) {
- addTable(theSchema, tableDef);
- }
- }
- }
-
- return theSchema;
- }
-
- private void addTable(final MutableSchema theSchema, final SimpleTableDef tableDef) {
- final MutableTable table = tableDef.toTable().setSchema(theSchema);
- final Column idColumn = table.getColumnByName(FIELD_ID);
- if (idColumn != null && idColumn instanceof MutableColumn) {
- final MutableColumn mutableColumn = (MutableColumn) idColumn;
- mutableColumn.setPrimaryKey(true);
- }
- theSchema.addTable(table);
- }
-
- @Override
- protected String getMainSchemaName() throws MetaModelException {
- return indexName;
- }
-
- @Override
- protected DataSet materializeMainSchemaTable(Table table, List<SelectItem> selectItems,
- List<FilterItem> whereItems, int firstRow, int maxRows) {
+ protected DataSet materializeMainSchemaTable(final Table table, final List<SelectItem> selectItems,
+ final List<FilterItem> whereItems, final int firstRow, final int maxRows) {
final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
LogicalOperator.AND);
if (queryBuilder != null) {
// where clause can be pushed down to an ElasticSearch query
SearchSourceBuilder searchSourceBuilder = createSearchRequest(firstRow, maxRows, queryBuilder);
- SearchResult result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
+ SearchResponse result = executeSearch(table, searchSourceBuilder, scrollNeeded(maxRows));
- return new JestElasticSearchDataSet(elasticSearchClient, result, selectItems);
+ return new ElasticSearchRestDataSet(getElasticSearchClient(), result, selectItems);
}
return super.materializeMainSchemaTable(table, selectItems, whereItems, firstRow, maxRows);
}
@@ -282,30 +198,30 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
return !limitMaxRowsIsSet(maxRows) || maxRows > SCROLL_THRESHOLD;
}
- private SearchResult executeSearch(Table table, SearchSourceBuilder searchSourceBuilder, boolean scroll) {
- Search.Builder builder = new Search.Builder(searchSourceBuilder.toString()).addIndex(getIndexName()).addType(
- table.getName());
+ private SearchResponse executeSearch(final Table table, final SearchSourceBuilder searchSourceBuilder,
+ final boolean scroll) {
+ final SearchRequest searchRequest = new SearchRequest(new String[] { getIndexName() }, searchSourceBuilder)
+ .types(table.getName());
+
if (scroll) {
- builder.setParameter(Parameters.SCROLL, TIMEOUT_SCROLL);
+ searchRequest.scroll(TIMEOUT_SCROLL);
}
- Search search = builder.build();
- SearchResult result;
try {
- result = elasticSearchClient.execute(search);
- } catch (Exception e) {
+ return getElasticSearchClient().search(searchRequest);
+ } catch (IOException e) {
logger.warn("Could not execute ElasticSearch query", e);
throw new MetaModelException("Could not execute ElasticSearch query", e);
}
- return result;
}
@Override
protected DataSet materializeMainSchemaTable(Table table, List<Column> columns, int maxRows) {
- SearchResult searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
+ SearchResponse searchResult = executeSearch(table, createSearchRequest(1, maxRows, null), scrollNeeded(
maxRows));
- return new JestElasticSearchDataSet(elasticSearchClient, searchResult, columns.stream().map(SelectItem::new).collect(Collectors.toList()));
+ return new ElasticSearchRestDataSet(getElasticSearchClient(), searchResult, columns.stream()
+ .map(SelectItem::new).collect(Collectors.toList()));
}
private SearchSourceBuilder createSearchRequest(int firstRow, int maxRows, QueryBuilder queryBuilder) {
@@ -317,7 +233,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
if (limitMaxRowsIsSet(maxRows)) {
searchRequest.size(maxRows);
} else {
- searchRequest.size(Integer.MAX_VALUE);
+ searchRequest.size(SCROLL_THRESHOLD);
}
if (queryBuilder != null) {
@@ -337,12 +253,16 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
final String documentType = table.getName();
final String id = keyValue.toString();
- final Get get = new Get.Builder(indexName, id).type(documentType).build();
- final JestResult getResult = JestClientExecutor.execute(elasticSearchClient, get);
-
final DataSetHeader header = new SimpleDataSetHeader(selectItems);
- return JestElasticSearchUtils.createRow(getResult.getJsonObject().get("_source").getAsJsonObject(), id, header);
+ try {
+ return ElasticSearchUtils.createRow(getElasticSearchClient()
+ .get(new GetRequest(getIndexName(), documentType, id))
+ .getSource(), id, header);
+ } catch (IOException e) {
+ logger.warn("Could not execute ElasticSearch query", e);
+ throw new MetaModelException("Could not execute ElasticSearch query", e);
+ }
}
@Override
@@ -352,30 +272,23 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
return null;
}
final String documentType = table.getName();
- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
+ final SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("_type", documentType));
+ sourceBuilder.size(0);
- Count count = new Count.Builder().addIndex(indexName).query(sourceBuilder.toString()).build();
-
- CountResult countResult;
try {
- countResult = elasticSearchClient.execute(count);
+ return getElasticSearchClient().search(new SearchRequest(new String[] { getIndexName() }, sourceBuilder))
+ .getHits().getTotalHits();
} catch (Exception e) {
logger.warn("Could not execute ElasticSearch get query", e);
throw new MetaModelException("Could not execute ElasticSearch get query", e);
}
-
- return countResult.getCount();
- }
-
- private boolean limitMaxRowsIsSet(int maxRows) {
- return (maxRows != -1);
}
@Override
public UpdateSummary executeUpdate(UpdateScript update) {
final boolean isBatch = update instanceof BatchUpdateScript;
- final JestElasticSearchUpdateCallback callback = new JestElasticSearchUpdateCallback(this, isBatch);
+ final ElasticSearchRestUpdateCallback callback = new ElasticSearchRestUpdateCallback(this, isBatch);
update.run(callback);
callback.onExecuteUpdateFinished();
return callback.getUpdateSummary();
@@ -384,14 +297,7 @@ public class ElasticSearchRestDataContext extends QueryPostprocessDataContext im
/**
* Gets the {@link JestClient} that this {@link DataContext} is wrapping.
*/
- public JestClient getElasticSearchClient() {
+ public ElasticSearchRestClient getElasticSearchClient() {
return elasticSearchClient;
}
-
- /**
- * Gets the name of the index that this {@link DataContext} is working on.
- */
- public String getIndexName() {
- return indexName;
- }
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
index b2dc4c3..b1756b7 100644
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataContextFactory.java
@@ -18,6 +18,14 @@
*/
package org.apache.metamodel.elasticsearch.rest;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.metamodel.ConnectionException;
import org.apache.metamodel.DataContext;
import org.apache.metamodel.factory.DataContextFactory;
@@ -25,10 +33,8 @@ import org.apache.metamodel.factory.DataContextProperties;
import org.apache.metamodel.factory.ResourceFactoryRegistry;
import org.apache.metamodel.factory.UnsupportedDataContextPropertiesException;
import org.apache.metamodel.util.SimpleTableDef;
-
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestClientFactory;
-import io.searchbox.client.config.HttpClientConfig;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
/**
* Factory for ElasticSearch data context of REST type.
@@ -72,18 +78,20 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory {
return true;
}
- private JestClient createClient(DataContextProperties properties) {
- final String serverUri = properties.getUrl();
- final HttpClientConfig.Builder builder = new HttpClientConfig.Builder(serverUri);
+ private ElasticSearchRestClient createClient(final DataContextProperties properties) throws MalformedURLException {
+ final URL url = new URL(properties.getUrl());
+ final RestClientBuilder builder = RestClient.builder(new HttpHost(url.getHost(), url.getPort()));
+
if (properties.getUsername() != null) {
- builder.defaultCredentials(properties.getUsername(), properties.getPassword());
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(),
+ properties.getPassword()));
+
+ builder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(
+ credentialsProvider));
}
- final JestClientFactory clientFactory = new JestClientFactory();
- final HttpClientConfig httpClientConfig = new HttpClientConfig(builder);
- clientFactory.setHttpClientConfig(httpClientConfig);
- final JestClient client = clientFactory.getObject();
- return client;
+ return new ElasticSearchRestClient(builder.build());
}
private String getIndex(DataContextProperties properties) {
@@ -97,10 +105,14 @@ public class ElasticSearchRestDataContextFactory implements DataContextFactory {
@Override
public DataContext create(DataContextProperties properties, ResourceFactoryRegistry resourceFactoryRegistry)
throws UnsupportedDataContextPropertiesException, ConnectionException {
- final JestClient client = createClient(properties);
- final String indexName = getIndex(properties);
- final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
- return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+ try {
+ ElasticSearchRestClient client = createClient(properties);
+ final String indexName = getIndex(properties);
+ final SimpleTableDef[] tableDefinitions = properties.getTableDefs();
+ return new ElasticSearchRestDataContext(client, indexName, tableDefinitions);
+ } catch (MalformedURLException e) {
+ throw new UnsupportedDataContextPropertiesException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
new file mode 100644
index 0000000..d79b271
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDataSet.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataContext;
+import org.apache.metamodel.elasticsearch.AbstractElasticSearchDataSet;
+import org.apache.metamodel.query.SelectItem;
+import org.elasticsearch.action.search.ClearScrollRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link DataSet} implementation for ElasticSearch
+ */
+final class ElasticSearchRestDataSet extends AbstractElasticSearchDataSet {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestDataSet.class);
+
+ private final ElasticSearchRestClient _client;
+
+ public ElasticSearchRestDataSet(final ElasticSearchRestClient client, final SearchResponse searchResponse, final List<SelectItem> selectItems) {
+ super(searchResponse, selectItems);
+ _client = client;
+ }
+
+ @Override
+ public void closeNow() {
+ final String scrollId = _searchResponse.getScrollId();
+ final ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
+ clearScrollRequest.addScrollId(scrollId);
+ try {
+ _client.execute(clearScrollRequest);
+ } catch (IOException e) {
+ logger.warn("Could not clear scroll.", e);
+ }
+ }
+
+ @Override
+ protected SearchResponse scrollSearchResponse(final String scrollId) throws IOException {
+ return _client.searchScroll(new SearchScrollRequest(scrollId).scroll(
+ AbstractElasticSearchDataContext.TIMEOUT_SCROLL));
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
new file mode 100644
index 0000000..f8caa2d
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestDeleteBuilder.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.delete.AbstractRowDeletionBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.query.LogicalOperator;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+/**
+ * {@link RowDeletionBuilder} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestDeleteBuilder extends AbstractRowDeletionBuilder {
+ private final ElasticSearchRestUpdateCallback _updateCallback;
+
+ public ElasticSearchRestDeleteBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
+ super(table);
+ _updateCallback = updateCallback;
+ }
+
+ @Override
+ public void execute() throws MetaModelException {
+ final Table table = getTable();
+ final String documentType = table.getName();
+
+ final ElasticSearchRestDataContext dataContext = _updateCallback.getDataContext();
+ final String indexName = dataContext.getIndexName();
+
+ final List<FilterItem> whereItems = getWhereItems();
+
+ // delete by query - note that creteQueryBuilderForSimpleWhere may
+ // return matchAllQuery() if no where items are present.
+ final QueryBuilder queryBuilder = ElasticSearchUtils.createQueryBuilderForSimpleWhere(whereItems,
+ LogicalOperator.AND);
+ if (queryBuilder == null) {
+ // TODO: The where items could not be pushed down to a query. We
+ // could solve this by running a query first, gather all
+ // document IDs and then delete by IDs.
+ throw new UnsupportedOperationException("Could not push down WHERE items to delete by query request: "
+ + whereItems);
+ }
+
+ final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+ searchSourceBuilder.query(queryBuilder);
+
+ SearchRequest searchRequest = new SearchRequest(indexName);
+ searchRequest.types(documentType);
+ searchRequest.source(searchSourceBuilder);
+
+ try {
+ final SearchResponse response = dataContext.getElasticSearchClient().search(searchRequest);
+
+ final Iterator<SearchHit> iterator = response.getHits().iterator();
+ while (iterator.hasNext()) {
+ final SearchHit hit = iterator.next();
+ final String typeId = hit.getId();
+
+ DeleteRequest deleteRequest = new DeleteRequest(indexName, documentType, typeId);
+
+ _updateCallback.execute(deleteRequest);
+ }
+ } catch (IOException e) {
+ throw new MetaModelException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
new file mode 100644
index 0000000..0ba4f66
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestInsertBuilder.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.elasticsearch.common.ElasticSearchUtils;
+import org.apache.metamodel.insert.AbstractRowInsertionBuilder;
+import org.apache.metamodel.schema.Column;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.index.IndexRequest;
+
+final class ElasticSearchRestInsertBuilder extends AbstractRowInsertionBuilder<ElasticSearchRestUpdateCallback> {
+
+ public ElasticSearchRestInsertBuilder(final ElasticSearchRestUpdateCallback updateCallback, final Table table) {
+ super(updateCallback, table);
+ }
+
+ @Override
+ public void execute() throws MetaModelException {
+ final ElasticSearchRestUpdateCallback updateCallback = getUpdateCallback();
+ final ElasticSearchRestDataContext dataContext = updateCallback.getDataContext();
+ final String indexName = dataContext.getIndexName();
+ final String documentType = getTable().getName();
+
+ final Map<String, Object> source = new HashMap<>();
+ final Column[] columns = getColumns();
+ final Object[] values = getValues();
+ String id = null;
+ for (int i = 0; i < columns.length; i++) {
+ if (isSet(columns[i])) {
+ final String columnName = columns[i].getName();
+
+ final Object value = values[i];
+ if (ElasticSearchUtils.FIELD_ID.equals(columnName)) {
+ if (value != null) {
+ id = value.toString();
+ }
+ } else {
+ final String fieldName = ElasticSearchUtils.getValidatedFieldName(columnName);
+ source.put(fieldName, value);
+ }
+ }
+ }
+
+ assert !source.isEmpty();
+
+ IndexRequest indexRequest = new IndexRequest(indexName, documentType, id);
+ indexRequest.source(source);
+
+ getUpdateCallback().execute(indexRequest);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
new file mode 100644
index 0000000..defd18f
--- /dev/null
+++ b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestUpdateCallback.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.rest;
+
+import java.io.IOException;
+
+import org.apache.metamodel.AbstractUpdateCallback;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.UpdateCallback;
+import org.apache.metamodel.create.TableCreationBuilder;
+import org.apache.metamodel.delete.RowDeletionBuilder;
+import org.apache.metamodel.drop.TableDropBuilder;
+import org.apache.metamodel.insert.RowInsertionBuilder;
+import org.apache.metamodel.schema.Schema;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequest;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link UpdateCallback} implementation for
+ * {@link ElasticSearchRestDataContext}.
+ */
+final class ElasticSearchRestUpdateCallback extends AbstractUpdateCallback {
+
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestUpdateCallback.class);
+
+ private static final int BULK_BUFFER_SIZE = 1000;
+
+ private BulkRequest bulkRequest;
+ private int bulkActionCount = 0;
+ private final boolean isBatch;
+
+ public ElasticSearchRestUpdateCallback(final ElasticSearchRestDataContext dataContext, final boolean isBatch) {
+ super(dataContext);
+ this.isBatch = isBatch;
+ }
+
+ private boolean isBatch() {
+ return isBatch;
+ }
+
+ @Override
+ public ElasticSearchRestDataContext getDataContext() {
+ return (ElasticSearchRestDataContext) super.getDataContext();
+ }
+
+ @Override
+ public TableCreationBuilder createTable(final Schema schema, final String name) throws IllegalArgumentException,
+ IllegalStateException {
+ return new ElasticSearchRestCreateTableBuilder(this, schema, name);
+ }
+
+ @Override
+ public boolean isDropTableSupported() {
+ return false;
+ }
+
+ @Override
+ public TableDropBuilder dropTable(final Table table) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RowInsertionBuilder insertInto(final Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new ElasticSearchRestInsertBuilder(this, table);
+ }
+
+ @Override
+ public boolean isDeleteSupported() {
+ return true;
+ }
+
+ @Override
+ public RowDeletionBuilder deleteFrom(final Table table) throws IllegalArgumentException, IllegalStateException,
+ UnsupportedOperationException {
+ return new ElasticSearchRestDeleteBuilder(this, table);
+ }
+
+ public void onExecuteUpdateFinished() {
+ if (isBatch()) {
+ flushBulkActions();
+ }
+
+ getDataContext().refreshSchemas();
+ }
+
+ private void flushBulkActions() {
+ if (bulkRequest == null || bulkActionCount == 0) {
+ // nothing to flush
+ return;
+ }
+
+ logger.info("Flushing {} actions to ElasticSearch index {}", bulkActionCount, getDataContext().getIndexName());
+ executeBlocking(bulkRequest);
+
+ bulkActionCount = 0;
+ bulkRequest = null;
+ }
+
+ public void execute(final ActionRequest action) {
+ if (isBatch() && (action instanceof DocWriteRequest<?>)) {
+ getBulkRequest().add((DocWriteRequest<?>) action);
+ bulkActionCount++;
+ if (bulkActionCount == BULK_BUFFER_SIZE) {
+ flushBulkActions();
+ }
+ } else {
+ executeBlocking(action);
+ }
+ }
+
+ private void executeBlocking(final ActionRequest action) {
+ try {
+ if (action instanceof PutMappingRequest) {
+ getDataContext().getElasticSearchClient().createMapping((PutMappingRequest) action);
+ } else {
+ final ActionResponse result = getDataContext().getElasticSearchClient().execute(action);
+
+ if (result instanceof BulkResponse && ((BulkResponse) result).hasFailures()) {
+ BulkItemResponse[] failedItems = ((BulkResponse) result).getItems();
+ for (int i = 0; i < failedItems.length; i++) {
+ if (failedItems[i].isFailed()) {
+ final BulkItemResponse failedItem = failedItems[i];
+ logger.error("Bulk failed with item no. {} of {}: id={} op={} status={} error={}", i + 1,
+ failedItems.length, failedItem.getId(), failedItem.getOpType(), failedItem.status(),
+ failedItem.getFailureMessage());
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ logger.warn("Could not execute command {} ", action, e);
+ throw new MetaModelException("Could not execute " + action, e);
+ }
+ }
+
+ private BulkRequest getBulkRequest() {
+ if (bulkRequest == null) {
+ bulkRequest = new BulkRequest();
+ }
+ return bulkRequest;
+ }
+}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
deleted file mode 100644
index 1bb026d..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestClientExecutor.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.Action;
-import io.searchbox.client.JestClient;
-import io.searchbox.client.JestResult;
-import org.apache.metamodel.MetaModelException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-final class JestClientExecutor {
- private static final Logger logger = LoggerFactory.getLogger(JestClientExecutor.class);
-
- static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest) {
- return execute(jestClient, clientRequest, true);
- }
-
- static <T extends JestResult> T execute(JestClient jestClient, Action<T> clientRequest, boolean doThrow) {
- try {
- final T result = jestClient.execute(clientRequest);
- logger.debug("{} response: acknowledged={}", clientRequest, result.isSucceeded());
- return result;
- } catch (IOException e) {
- logger.warn("Could not execute command {} ", clientRequest, e);
- if (doThrow) {
- throw new MetaModelException("Could not execute command " + clientRequest, e);
- }
- }
-
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/metamodel/blob/bda8d764/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
----------------------------------------------------------------------
diff --git a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java b/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
deleted file mode 100644
index cc42b07..0000000
--- a/elasticsearch/rest/src/main/java/org/apache/metamodel/elasticsearch/rest/JestDeleteScroll.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.metamodel.elasticsearch.rest;
-
-import io.searchbox.action.GenericResultAbstractAction;
-
-public class JestDeleteScroll extends GenericResultAbstractAction {
- private JestDeleteScroll(Builder builder) {
- super(builder);
- this.payload = builder.getScrollId();
- setURI(buildURI());
- }
-
- @Override
- public String getRestMethodName() {
- return "DELETE";
- }
-
- @Override
- protected String buildURI() {
- return super.buildURI() + "/_search/scroll";
- }
-
- public static class Builder extends GenericResultAbstractAction.Builder<JestDeleteScroll, Builder> {
- private final String scrollId;
-
- public Builder(String scrollId) {
- this.scrollId = scrollId;
- }
-
- @Override
- public JestDeleteScroll build() {
- return new JestDeleteScroll(this);
- }
-
- public String getScrollId() {
- return scrollId;
- }
- }
-
-}