You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2014/09/20 12:26:41 UTC
[1/2] git commit: METAMODEL-77: Introduction of Alberto Rodriguez'
ElasticSearch patch.
Repository: incubator-metamodel
Updated Branches:
refs/heads/master 7391a4410 -> 55a752feb
METAMODEL-77: Introduction of Alberto Rodriguez' ElasticSearch patch.
Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/b65f2e12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/b65f2e12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/b65f2e12
Branch: refs/heads/master
Commit: b65f2e121be9d1214fdf46d4803109c1eff04dc4
Parents: 7391a44
Author: Kasper Sørensen <i....@gmail.com>
Authored: Sat Sep 20 12:20:56 2014 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Sat Sep 20 12:20:56 2014 +0200
----------------------------------------------------------------------
.gitignore | 4 +-
CHANGES.txt | 3 +-
elasticsearch/.gitignore | 4 +
elasticsearch/pom.xml | 65 ++++
.../elasticsearch/ElasticSearchDataContext.java | 306 +++++++++++++++++++
.../elasticsearch/ElasticSearchDataSet.java | 104 +++++++
.../elasticsearch/ElasticSearchMetaData.java | 53 ++++
.../ElasticSearchMetaDataParser.java | 94 ++++++
.../ElasticSearchDataContextTest.java | 274 +++++++++++++++++
.../ElasticSearchMetaDataParserTest.java | 44 +++
.../utils/EmbeddedElasticsearchServer.java | 71 +++++
full/pom.xml | 6 +-
.../apache/metamodel/DataContextFactory.java | 12 +
pom.xml | 4 +
14 files changed, 1041 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b920004..f4edda6 100644
--- a/.gitignore
+++ b/.gitignore
@@ -2,4 +2,6 @@
/.settings
/target
/.idea
-*.iml
\ No newline at end of file
+*.iml
+*.ipr
+*.iws
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e9ba987..9ecdc7b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
-Apache MetaModel 4.2.1-incubating
+Apache MetaModel 4.3.0-incubating
+ * [METAMODEL-77] - New module 'elasticsearch' for connecting and modeling ElasticSearch indexes through MetaModel.
* [METAMODEL-74] - Fixed bug related to skipping blank values when applying an aggregate function (SUM, AVG etc.)
Apache MetaModel 4.2.0-incubating
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/.gitignore
----------------------------------------------------------------------
diff --git a/elasticsearch/.gitignore b/elasticsearch/.gitignore
new file mode 100644
index 0000000..4e247ee
--- /dev/null
+++ b/elasticsearch/.gitignore
@@ -0,0 +1,4 @@
+/.settings
+/target
+/.classpath
+/.project
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/elasticsearch/pom.xml b/elasticsearch/pom.xml
new file mode 100644
index 0000000..4471f1f
--- /dev/null
+++ b/elasticsearch/pom.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <artifactId>MetaModel</artifactId>
+ <groupId>org.apache.metamodel</groupId>
+ <version>4.3-incubating-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>MetaModel-elasticsearch</artifactId>
+ <name>MetaModel module for Elasticsearch analytics engine</name>
+
+ <properties>
+ <elasticsearch.version>1.3.2</elasticsearch.version>
+ <commons-io.version>2.4</commons-io.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.metamodel</groupId>
+ <artifactId>MetaModel-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- elasticsearch -->
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${elasticsearch.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>${commons-io.version}</version>
+ </dependency>
+ <!-- test -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
new file mode 100644
index 0000000..06ea651
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.MetaModelException;
+import org.apache.metamodel.QueryPostprocessDataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.query.FilterItem;
+import org.apache.metamodel.schema.*;
+import org.apache.metamodel.util.SimpleTableDef;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.count.CountResponse;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.hppc.ObjectLookupContainer;
+import org.elasticsearch.common.hppc.cursors.ObjectCursor;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * DataContext implementation for ElasticSearch analytics engine.
+ *
+ * Since ElasticSearch has indexes and types a virtual schema will be used in this
+ * DataContext where the tables will be the types. We will also maintain a hashmap that
+ * will contain the index/type relationship. This implementation supports either automatic
+ * discovery of a schema or manual specification of a schema, through the {@link SimpleTableDef} class.
+ *
+ * @author Alberto Rodriguez
+ */
+public class ElasticSearchDataContext extends QueryPostprocessDataContext
+ implements DataContext {
+ private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
+ private static final String ES_CLUSTER_NAME = "cluster.name";
+
+ private final Client elasticSearchClient;
+ private final SimpleTableDef[] tableDefs;
+ private HashMap<String,String> typeAndIndexes = new HashMap();
+
+ /**
+ * Constructs a {@link ElasticSearchDataContext}. This constructor accepts a
+ * custom array of {@link SimpleTableDef}s which allows the user to define
+ * his own view on the indexes in the engine.
+ *
+ * @param client
+ * the ElasticSearch client
+ * @param tableDefs
+ * an array of {@link SimpleTableDef}s, which define the table
+ * and column model of the elasticsearch indexes.
+ */
+ public ElasticSearchDataContext(Client client, SimpleTableDef... tableDefs) {
+ this.elasticSearchClient = client;
+ this.tableDefs = tableDefs;
+ }
+
+ /**
+ * Constructs a {@link ElasticSearchDataContext} and automatically detects the
+ * schema structure/view on all indexes (see {@link #detectSchema(Client)}).
+ *
+ * @param client
+ * the ElasticSearch client
+ */
+ public ElasticSearchDataContext(Client client) {
+ this(client, detectSchema(client));
+ }
+
+
+ /**
+ * Performs an analysis of the available indexes in an ElasticSearch cluster {@link Client}
+ * instance and detects the elasticsearch types structure based on the metadata provided by
+ * the ElasticSearch java client.
+ *
+ * @see #detectTable(ClusterState, String, String)
+ *
+ * @param client
+ * the client to inspect
+ * @return a mutable schema instance, useful for further fine tuning by the
+ * user.
+ */
+ public static SimpleTableDef[] detectSchema(Client client) {
+ List<String> indexNames = new ArrayList();
+ ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().execute().actionGet();
+ ImmutableOpenMap<String,IndexMetaData> indexes = clusterStateResponse.getState().getMetaData().getIndices();
+ for (ObjectCursor<String> typeCursor : indexes.keys())
+ indexNames.add(typeCursor.value);
+ List<SimpleTableDef> result = new ArrayList();
+ for (String indexName : indexNames) {
+ ClusterState cs = client.admin().cluster().prepareState().setIndices(indexName).execute().actionGet().getState();
+ IndexMetaData imd = cs.getMetaData().index(indexName);
+ ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
+ ObjectLookupContainer indexTypes = mappings.keys();
+ for (Object indexType: indexTypes) {
+ String typeName = ((ObjectCursor) indexType).value.toString();
+ try {
+ SimpleTableDef table = detectTable(cs, indexName, typeName);
+ result.add(table);
+ } catch(Exception e) {
+ logger.error("Unexpected error during detectSchema for table: "+typeName, e);
+ }
+ }
+
+ }
+ SimpleTableDef[] tableDefArray = (SimpleTableDef[]) result.toArray(new SimpleTableDef[result.size()]);
+ return tableDefArray;
+ }
+
+ /**
+ * Performs an analysis of an available index type in an ElasticSearch {@link Client}
+ * client and tries to detect the index structure based on the metadata provided
+ * by the java client.
+ *
+ * @param cs
+ * the ElasticSearch cluster
+ * @param indexName
+ * the name of the index
+ * @param typeName
+ * the name of the index type
+ * @return a table definition for ElasticSearch.
+ */
+ public static SimpleTableDef detectTable(ClusterState cs, String indexName, String typeName) throws Exception {
+ IndexMetaData imd = cs.getMetaData().index(indexName);
+ MappingMetaData mappingMetaData = imd.mapping(typeName);
+ Map<String, Object> mp = mappingMetaData.getSourceAsMap();
+ Iterator it = mp.entrySet().iterator();
+ Map.Entry pair = (Map.Entry)it.next();
+ ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(pair.getValue());
+ return new SimpleTableDef(typeName, metaData.getColumnNames(), metaData.getColumnTypes());
+ }
+
+
+ @Override
+ protected Schema getMainSchema() throws MetaModelException {
+ MutableSchema theSchema = new MutableSchema(getMainSchemaName());
+ for (SimpleTableDef tableDef: tableDefs) {
+ MutableTable table = tableDef.toTable().setSchema(theSchema);
+ theSchema.addTable(table);
+ }
+ return theSchema;
+ }
+
+ @Override
+ protected String getMainSchemaName() throws MetaModelException {
+ return elasticSearchClient.settings().get(ES_CLUSTER_NAME);
+ }
+
+ @Override
+ protected DataSet materializeMainSchemaTable(Table table, Column[] columns, int maxRows) {
+ SearchRequestBuilder requestBuilder = elasticSearchClient.
+ prepareSearch(getIndexNameForIndexType(table.getName())).
+ setTypes(table.getName());
+ if (limitMaxRowsIsSet(maxRows)) requestBuilder.setSize(maxRows);
+ SearchResponse response = requestBuilder.execute().actionGet();
+ return new ElasticSearchDataSet(response, columns, false);
+ }
+
+ @Override
+ protected Number executeCountQuery(Table table, List<FilterItem> whereItems, boolean functionApproximationAllowed) {
+ CountResponse response = elasticSearchClient.prepareCount(getIndexNameForIndexType(table.getName()))
+ .setQuery(QueryBuilders.termQuery("_type", table.getName()))
+ .execute()
+ .actionGet();
+ return response.getCount();
+ }
+
+ private boolean limitMaxRowsIsSet(int maxRows) {
+ return (maxRows != -1);
+ }
+
+ private String getIndexNameForIndexType(String indexType) {
+ String indexName = typeAndIndexes.get(indexType);
+ if (indexName==null)
+ indexName = fetchIndexNameFromES(indexType);
+ return indexName;
+ }
+
+ private String fetchIndexNameFromES(String indexType) {
+ String theIndexName = "";
+ boolean indexNameFound = false;
+ List<String> indexNames = getIndexNamesFromES();
+ for (String indexName : indexNames) {
+ if (!indexNameFound) {
+ ClusterState cs = elasticSearchClient.admin().cluster().prepareState().setIndices(indexName).execute().actionGet().getState();
+ IndexMetaData imd = cs.getMetaData().index(indexName);
+ ImmutableOpenMap<String, MappingMetaData> mappings = imd.getMappings();
+ ObjectLookupContainer indexTypes = mappings.keys();
+ for (Object type: indexTypes) {
+ String typeName = ((ObjectCursor) type).value.toString();
+ if (typeName.equals(indexType)) {
+ theIndexName = indexName;
+ typeAndIndexes.put(typeName, indexName);
+ indexNameFound = true;
+ }
+ }
+ }
+ }
+ return theIndexName;
+ }
+
+ private List<String> getIndexNamesFromES() {
+ List<String> indexNames = new ArrayList();
+ ClusterStateResponse clusterStateResponse = elasticSearchClient.admin().cluster().prepareState().execute().actionGet();
+ ImmutableOpenMap<String,IndexMetaData> indexes = clusterStateResponse.getState().getMetaData().getIndices();
+ for (ObjectCursor<String> typeCursor : indexes.keys())
+ indexNames.add(typeCursor.value);
+ return indexNames;
+ }
+
+
+
+/* TODO: Implement corner cases of WHERE, GROUPBY... items that we can support natively.
+ @Override
+ public DataSet executeQuery(Query query) {
+ // Check for queries containing only simple selects and where clauses,
+ // or if it is a COUNT(*) query.
+ // if from clause only contains a main schema table
+ List<FromItem> fromItems = query.getFromClause().getItems();
+ if (fromItems.size() == 1 && fromItems.get(0).getTable() != null && fromItems.get(0).getTable().getSchema() == schema) {
+ final Table table = fromItems.get(0).getTable();
+
+ // if GROUP BY, HAVING and ORDER BY clauses are not specified
+ if (query.getGroupByClause().isEmpty() && query.getHavingClause().isEmpty() && query.getOrderByClause().isEmpty()) {
+
+ final List<FilterItem> whereItems = query.getWhereClause().getItems();
+
+ // if all of the select items are "pure" column selection
+ boolean allSelectItemsAreColumns = true;
+ List<SelectItem> selectItems = query.getSelectClause().getItems();
+
+ // if it is a
+ // "SELECT [columns] FROM [table] WHERE [conditions]"
+ // query.
+ for (SelectItem selectItem : selectItems) {
+ if (selectItem.getFunction() != null || selectItem.getColumn() == null) {
+ allSelectItemsAreColumns = false;
+ break;
+ }
+ }
+
+ if (allSelectItemsAreColumns) {
+ logger.debug("Query can be expressed in full ElasticSearch, no post processing needed.");
+
+ // prepare for a non-post-processed query
+ Column[] columns = new Column[selectItems.size()];
+ for (int i = 0; i < columns.length; i++) {
+ columns[i] = selectItems.get(i).getColumn();
+ }
+
+ int firstRow = (query.getFirstRow() == null ? 1 : query.getFirstRow());
+ int maxRows = (query.getMaxRows() == null ? -1 : query.getMaxRows());
+
+ final DataSet dataSet = materializeMainSchemaTableInternal(table, columns, whereItems, firstRow, maxRows,
+ false);
+ return dataSet;
+ }
+ }
+ }
+ logger.debug("Query will be simplified for ElasticSearch and post processed.");
+ return super.executeQuery(query);
+ }
+
+ private DataSet materializeMainSchemaTableInternal(Table table, Column[] columns, List<FilterItem> whereItems, int firstRow,
+ int maxRows, boolean queryPostProcessed) {
+ //final SearchRequestBuilder collection = elasticSearchClient.prepareSearch(typeAndIndexes.get(table.getName())).setTypes(table.getName());
+ ClusterStateResponse clusterStateResponse = elasticSearchClient.admin().cluster().prepareState().execute().actionGet();
+ ImmutableOpenMap<String,IndexMetaData> indexes = clusterStateResponse.getState().getMetaData().getIndices();
+ //final SearchRequestBuilder collection = elasticSearchClient.prepareSearch("twitter").setTypes("tweet1");
+ SearchRequestBuilder requestBuilder = elasticSearchClient.prepareSearch();
+
+ if (whereItems != null && !whereItems.isEmpty()) {
+ for (FilterItem item : whereItems) {
+ String operandWithIndexName = item.getSelectItem().toString();
+ int operandNameIndexStart = operandWithIndexName.indexOf(".")+1;
+ String operandWithoutIndexName = operandWithIndexName.substring(operandNameIndexStart);
+ requestBuilder.setQuery(QueryBuilders.termQuery(operandWithoutIndexName, item.getOperand()));
+ }
+ }
+
+ SearchResponse response = requestBuilder.execute().actionGet();
+
+ return new ElasticSearchDataSet(response, columns, queryPostProcessed);
+ }*/
+}
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
new file mode 100644
index 0000000..d68b4f0
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import org.apache.metamodel.data.AbstractDataSet;
+import org.apache.metamodel.data.DefaultRow;
+import org.apache.metamodel.data.Row;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.Column;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+final class ElasticSearchDataSet extends AbstractDataSet {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(ElasticSearchDataSet.class);
+
+ private int readCount = 0;
+
+ private final SearchHit[] _cursor;
+ private final boolean _queryPostProcessed;
+
+ private boolean _closed;
+ private volatile SearchHit _dbObject;
+
+ public ElasticSearchDataSet(SearchResponse cursor, Column[] columns,
+ boolean queryPostProcessed) {
+ super(columns);
+ _cursor = cursor.getHits().hits();
+ _queryPostProcessed = queryPostProcessed;
+ _closed = false;
+ }
+
+ public boolean isQueryPostProcessed() {
+ return _queryPostProcessed;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ //_cursor.close();
+ _closed = true;
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if (!_closed) {
+ logger.warn(
+ "finalize() invoked, but DataSet is not closed. Invoking close() on {}",
+ this);
+ close();
+ }
+ }
+
+ @Override
+ public boolean next() {
+ if (readCount<_cursor.length) {
+ _dbObject = _cursor[readCount];
+ readCount++;
+ return true;
+ } else {
+ _dbObject = null;
+ return false;
+ }
+ }
+
+ @Override
+ public Row getRow() {
+ if (_dbObject == null) {
+ return null;
+ }
+
+ final int size = getHeader().size();
+ final Object[] values = new Object[size];
+ for (int i = 0; i < values.length; i++) {
+ final SelectItem selectItem = getHeader().getSelectItem(i);
+ final Map<String, Object> element = _dbObject.getSource();
+ final String key = selectItem.getColumn().getName();
+ values[i] = element.get(key);
+ }
+ return new DefaultRow(getHeader(), values);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaData.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaData.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaData.java
new file mode 100644
index 0000000..328430d
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaData.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * MetaData representation of an ElasticSearch index type.
+ *
+ * We will map the elasticsearch fields to columns and their
+ * types to {@link ColumnType}s.
+ *
+ * @author Alberto Rodriguez
+ */
+public class ElasticSearchMetaData {
+ private String[] columnNames;
+ private ColumnType[] columnTypes;
+
+ /**
+ * Constructs a {@link ElasticSearchMetaData}.
+ *
+ * @param names
+ * @param types
+ */
+ public ElasticSearchMetaData(String[] names, ColumnType[] types) {
+ this.columnNames = names;
+ this.columnTypes = types;
+ }
+
+ public String[] getColumnNames() {
+ return columnNames;
+ }
+
+ public ColumnType[] getColumnTypes() {
+ return columnTypes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParser.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParser.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParser.java
new file mode 100644
index 0000000..8d9589d
--- /dev/null
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParser.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+
+import org.apache.metamodel.schema.ColumnType;
+
+/**
+ * Parser that transforms the ElasticSearch metadata response (json-like format)
+ * into an ElasticSearchMetaData object.
+ *
+ *
+ * @author Alberto Rodriguez
+ */
+public class ElasticSearchMetaDataParser {
+ /**
+ * Parses the ElasticSearch meta data info into an ElasticSearchMetaData object. This
+ * method makes much easier to create the ElasticSearch schema.
+ *
+ * @param metaDataInfo
+ * ElasticSearch metadata info in a json-like format
+ * @return
+ * An ElasticSearchMetaData object
+ */
+ public static ElasticSearchMetaData parse(Object metaDataInfo) {
+ String plainMetaDataInfo = removeFirstAndLastCharacter(metaDataInfo.toString());
+ String metaDataWithoutDateFormats = removeDateFormats(plainMetaDataInfo);
+ String[] metaDataFields = metaDataWithoutDateFormats.split(",");
+ String[] fieldNames = new String[metaDataFields.length];
+ ColumnType[] columnTypes = new ColumnType[metaDataFields.length];
+ int i = 0;
+ for (String metaDataField: metaDataFields) {
+ //message={type=long}
+ fieldNames[i] = getNameFromMetaDataField(metaDataField);
+ columnTypes[i] = getColumnTypeFromMetaDataField(metaDataField);
+ i++;
+
+ }
+ return new ElasticSearchMetaData(fieldNames, columnTypes);
+ }
+
+ private static String removeFirstAndLastCharacter(String metaDataInfo) {
+ return metaDataInfo.substring(1, metaDataInfo.length()-1);
+ }
+
+ private static String removeDateFormats(String metaDataInfo) {
+ return metaDataInfo.replaceAll("type=date.*?}", "type=date}");
+ }
+
+ private static String getNameFromMetaDataField(String metaDataField) {
+ return metaDataField.substring(0, metaDataField.indexOf("=")).trim();
+ }
+
+ private static ColumnType getColumnTypeFromMetaDataField(String metaDataField) {
+ ColumnType columnType = null;
+ String metaDataFieldType = getMetaDataFieldTypeFromMetaDataField(metaDataField);
+ if (metaDataFieldType.equals("long"))
+ columnType = ColumnType.BIGINT;
+ else if (metaDataFieldType.equals("date"))
+ columnType = ColumnType.DATE;
+ else if (metaDataFieldType.equals("string"))
+ columnType = ColumnType.STRING;
+ else if (metaDataFieldType.equals("float"))
+ columnType = ColumnType.FLOAT;
+ else if (metaDataFieldType.equals("boolean"))
+ columnType = ColumnType.FLOAT;
+ else
+ columnType = ColumnType.STRING;
+ return columnType;
+ }
+
+ private static String getMetaDataFieldTypeFromMetaDataField(String metaDataField) {
+ String metaDataFieldWithoutName = metaDataField.substring(metaDataField.indexOf("=")+1);
+ String metaDataFieldType = metaDataFieldWithoutName.substring(metaDataFieldWithoutName.indexOf("=")+1, metaDataFieldWithoutName.length()-1);
+ return metaDataFieldType;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
new file mode 100644
index 0000000..cd98d08
--- /dev/null
+++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import junit.framework.TestCase;
+import org.apache.metamodel.DataContext;
+import org.apache.metamodel.data.DataSet;
+import org.apache.metamodel.data.DataSetTableModel;
+import org.apache.metamodel.data.FilteredDataSet;
+import org.apache.metamodel.elasticsearch.utils.EmbeddedElasticsearchServer;
+import org.apache.metamodel.query.FunctionType;
+import org.apache.metamodel.query.Query;
+import org.apache.metamodel.query.SelectItem;
+import org.apache.metamodel.schema.ColumnType;
+import org.apache.metamodel.schema.Table;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+
+import javax.swing.table.TableModel;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+
+public class ElasticSearchDataContextTest extends TestCase {
+
+ private EmbeddedElasticsearchServer embeddedElasticsearchServer;
+ private Client client;
+ DataContext dataContext;
+ String indexName = "twitter";
+ String indexType1 = "tweet1";
+ String indexType2 = "tweet2";
+ String bulkIndexName = "bulktwitter";
+ String bulkIndexType = "bulktype";
+ String peopleIndexName = "peopleindex";
+ String peopleIndexType = "peopletype";
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ embeddedElasticsearchServer = new EmbeddedElasticsearchServer();
+ client = embeddedElasticsearchServer.getClient();
+ indexOneTweeterDocumentPerIndex(indexType1, 1);
+ indexOneTweeterDocumentPerIndex(indexType2, 1);
+ indexOnePeopleDocument("female", 20, 5);
+ indexOnePeopleDocument("female", 17, 8);
+ indexOnePeopleDocument("female", 18, 9);
+ indexOnePeopleDocument("female", 19, 10);
+ indexOnePeopleDocument("female", 20, 11);
+ indexOnePeopleDocument("male", 19, 1);
+ indexOnePeopleDocument("male", 17, 2);
+ indexOnePeopleDocument("male", 18, 3);
+ indexOnePeopleDocument("male", 18, 4);
+ indexOneTweeterDocumentPerIndex(indexType2, 1);
+ indexBulkDocuments(bulkIndexName, bulkIndexType, 10);
+ // Waiting for indexing the data....
+ Thread.sleep(2000);
+ dataContext = new ElasticSearchDataContext(client);
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ super.tearDown();
+ embeddedElasticsearchServer.shutdown();
+ }
+
+ public void testSimpleQuery() throws Exception {
+ assertEquals("[peopletype, bulktype, tweet1, tweet2]", Arrays.toString(dataContext.getDefaultSchema().getTableNames()));
+
+ Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
+
+ assertEquals(ColumnType.STRING, table.getColumnByName("user").getType());
+ assertEquals(ColumnType.DATE, table.getColumnByName("postDate").getType());
+ assertEquals(ColumnType.BIGINT, table.getColumnByName("message").getType());
+
+ DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute();
+ assertEquals(ElasticSearchDataSet.class, ds.getClass());
+ assertFalse(((ElasticSearchDataSet) ds).isQueryPostProcessed());
+
+ try {
+ assertTrue(ds.next());
+ assertEquals("Row[values=[user1, 1]]",
+ ds.getRow().toString());
+ } finally {
+ //ds.close();
+ }
+ }
+
+ public void testWhereColumnEqualsValues() throws Exception {
+ DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user").isEquals("user4").execute();
+ assertEquals(FilteredDataSet.class, ds.getClass());
+
+ try {
+ assertTrue(ds.next());
+ assertEquals("Row[values=[user4, 4]]",
+ ds.getRow().toString());
+ assertFalse(ds.next());
+ } finally {
+ ds.close();
+ }
+ }
+
+ public void testWhereColumnInValues() throws Exception {
+ DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user").in("user4", "user5").execute();
+ assertEquals(FilteredDataSet.class, ds.getClass());
+
+ try {
+ assertTrue(ds.next());
+ assertEquals("Row[values=[user4, 4]]",
+ ds.getRow().toString());
+ assertTrue(ds.next());
+ assertEquals("Row[values=[user5, 5]]",
+ ds.getRow().toString());
+ assertFalse(ds.next());
+ } finally {
+ ds.close();
+ }
+ }
+
+ public void testGroupByQuery() throws Exception {
+ Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+
+ Query q = new Query();
+ q.from(table);
+ q.groupBy(table.getColumnByName("gender"));
+ q.select(new SelectItem(table.getColumnByName("gender")),
+ new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
+ new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
+ "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId"));
+ DataSet data = dataContext.executeQuery(q);
+ assertEquals(
+ "[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
+ Arrays.toString(data.getSelectItems()));
+
+ String[] expectations = new String[] { "Row[values=[female, 20, 17, 5, 5]]", "Row[values=[male, 19, 17, 4, 1]]" };
+
+ assertTrue(data.next());
+ assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+ assertTrue(data.next());
+ assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+ assertFalse(data.next());
+ }
+
+ public void testFilterOnNumberColumn() {
+ Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+ Query q = dataContext.query().from(table).select("user").where("message").greaterThan(7).toQuery();
+ DataSet data = dataContext.executeQuery(q);
+ String[] expectations = new String[] { "Row[values=[user8]]", "Row[values=[user9]]" };
+
+ assertTrue(data.next());
+ assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+ assertTrue(data.next());
+ assertTrue(Arrays.asList(expectations).contains(data.getRow().toString()));
+ assertFalse(data.next());
+ }
+
+ public void testMaxRows() throws Exception {
+ Table table = dataContext.getDefaultSchema().getTableByName(peopleIndexType);
+ Query query = new Query().from(table).select(table.getColumns()).setMaxRows(5);
+ DataSet dataSet = dataContext.executeQuery(query);
+
+ TableModel tableModel = new DataSetTableModel(dataSet);
+ assertEquals(5, tableModel.getRowCount());
+ }
+
+ public void testCountQuery() throws Exception {
+ Table table = dataContext.getDefaultSchema().getTableByName(bulkIndexType);
+ Query q = new Query().selectCount().from(table);
+
+ List<Object[]> data = dataContext.executeQuery(q).toObjectArrays();
+ assertEquals(1, data.size());
+ Object[] row = data.get(0);
+ assertEquals(1, row.length);
+ assertEquals("[10]", Arrays.toString(row));
+ }
+
+ public void testQueryForANonExistingTable() throws Exception {
+ boolean thrown = false;
+ try {
+ dataContext.query().from("nonExistingTable").select("user").and("message").execute();
+ } catch(IllegalArgumentException IAex) {
+ thrown = true;
+ } finally {
+ //ds.close();
+ }
+ assertTrue(thrown);
+ }
+
+ public void testQueryForAnExistingTableAndNonExistingField() throws Exception {
+ indexOneTweeterDocumentPerIndex(indexType1, 1);
+ boolean thrown = false;
+ try {
+ dataContext.query().from(indexType1).select("nonExistingField").execute();
+ } catch(IllegalArgumentException IAex) {
+ thrown = true;
+ } finally {
+ //ds.close();
+ }
+ assertTrue(thrown);
+ }
+
+
+
+ private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
+ BulkRequestBuilder bulkRequest = client.prepareBulk();
+
+ try {
+ for (int i = 0; i < numberOfDocuments; i++) {
+ bulkRequest.add(client.prepareIndex(indexName, indexType, new Integer(i).toString())
+ .setSource(buildTweeterJson(i)));
+ }
+ bulkRequest.execute().actionGet();
+ } catch (Exception ex) {
+ System.out.println("Exception indexing documents!!!!!");
+ }
+
+ }
+
+ private void indexOneTweeterDocumentPerIndex(String indexType, int id) {
+ try {
+ client.prepareIndex(indexName, indexType)
+ .setSource(buildTweeterJson(id))
+ .execute()
+ .actionGet();
+ } catch (Exception ex) {
+ System.out.println("Exception indexing documents!!!!!");
+ }
+ }
+
+ private void indexOnePeopleDocument(String gender, int age, int id) {
+ try {
+ client.prepareIndex(peopleIndexName, peopleIndexType)
+ .setSource(buildPeopleJson(gender, age, id))
+ .execute()
+ .actionGet();
+ } catch (Exception ex) {
+ System.out.println("Exception indexing documents!!!!!");
+ }
+ }
+
+ private XContentBuilder buildTweeterJson(int elementId) throws Exception {
+ return jsonBuilder().startObject().field("user", "user" + elementId)
+ .field("postDate", new Date())
+ .field("message", elementId)
+ .endObject();
+ }
+
+ private XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws Exception {
+ return jsonBuilder().startObject().field("gender", gender)
+ .field("age", age)
+ .field("id", elementId)
+ .endObject();
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParserTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParserTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParserTest.java
new file mode 100644
index 0000000..d7e62e6
--- /dev/null
+++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchMetaDataParserTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch;
+
+import junit.framework.TestCase;
+import org.apache.metamodel.schema.ColumnType;
+
+public class ElasticSearchMetaDataParserTest extends TestCase {
+
+ public void testParseMetadataInfo() throws Exception {
+ String metaDataInfo = "{message={type=long}, postDate={type=date, format=dateOptionalTime}, anotherDate={type=date, format=dateOptionalTime}, user={type=string}}";
+
+ ElasticSearchMetaData metaData = ElasticSearchMetaDataParser.parse(metaDataInfo);
+ String[] columnNames = metaData.getColumnNames();
+ ColumnType[] columnTypes = metaData.getColumnTypes();
+
+ assertTrue(columnNames.length==4);
+ assertEquals(columnNames[0], "message");
+ assertEquals(columnNames[1], "postDate");
+ assertEquals(columnNames[2], "anotherDate");
+ assertEquals(columnNames[3], "user");
+ assertTrue(columnTypes.length == 4);
+ assertEquals(columnTypes[0], ColumnType.BIGINT);
+ assertEquals(columnTypes[1], ColumnType.DATE);
+ assertEquals(columnTypes[2], ColumnType.DATE);
+ assertEquals(columnTypes[3], ColumnType.STRING);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/utils/EmbeddedElasticsearchServer.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/utils/EmbeddedElasticsearchServer.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/utils/EmbeddedElasticsearchServer.java
new file mode 100644
index 0000000..5e6fcff
--- /dev/null
+++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/utils/EmbeddedElasticsearchServer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.metamodel.elasticsearch.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.node.Node;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
+
+public class EmbeddedElasticsearchServer {
+
+ private static final String DEFAULT_DATA_DIRECTORY = "target/elasticsearch-data";
+
+ private final Node node;
+ private final String dataDirectory;
+
+ public EmbeddedElasticsearchServer() {
+ this(DEFAULT_DATA_DIRECTORY);
+ }
+
+ public EmbeddedElasticsearchServer(String dataDirectory) {
+ this.dataDirectory = dataDirectory;
+
+ ImmutableSettings.Builder elasticsearchSettings = ImmutableSettings.settingsBuilder()
+ .put("http.enabled", "true")
+ .put("path.data", dataDirectory);
+
+ node = nodeBuilder()
+ .local(true)
+ .settings(elasticsearchSettings.build())
+ .node();
+ }
+
+ public Client getClient() {
+ return node.client();
+ }
+
+ public void shutdown() {
+ node.close();
+ deleteDataDirectory();
+ }
+
+ private void deleteDataDirectory() {
+ try {
+ FileUtils.deleteDirectory(new File(dataDirectory));
+ } catch (IOException e) {
+ throw new RuntimeException("Could not delete data directory of embedded elasticsearch server", e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/full/pom.xml
----------------------------------------------------------------------
diff --git a/full/pom.xml b/full/pom.xml
index 8337866..670a39a 100644
--- a/full/pom.xml
+++ b/full/pom.xml
@@ -145,7 +145,11 @@ under the License.
<artifactId>MetaModel-xml</artifactId>
<version>${project.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.metamodel</groupId>
+ <artifactId>MetaModel-elasticsearch</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Test dependencies -->
<dependency>
<groupId>junit</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/full/src/main/java/org/apache/metamodel/DataContextFactory.java
----------------------------------------------------------------------
diff --git a/full/src/main/java/org/apache/metamodel/DataContextFactory.java b/full/src/main/java/org/apache/metamodel/DataContextFactory.java
index 8905bef..6db6b72 100644
--- a/full/src/main/java/org/apache/metamodel/DataContextFactory.java
+++ b/full/src/main/java/org/apache/metamodel/DataContextFactory.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import javax.sql.DataSource;
+import org.apache.metamodel.elasticsearch.ElasticSearchDataContext;
import org.ektorp.http.StdHttpClient.Builder;
import org.apache.metamodel.couchdb.CouchDbDataContext;
import org.apache.metamodel.csv.CsvConfiguration;
@@ -44,6 +45,7 @@ import org.apache.metamodel.sugarcrm.SugarCrmDataContext;
import org.apache.metamodel.util.FileHelper;
import org.apache.metamodel.util.SimpleTableDef;
import org.apache.metamodel.xml.XmlDomDataContext;
+import org.elasticsearch.client.Client;
import org.xml.sax.InputSource;
import com.mongodb.DB;
@@ -635,4 +637,14 @@ public class DataContextFactory {
}
return new CouchDbDataContext(httpClientBuilder, tableDefs);
}
+
+ /**
+ * Creates a new ElasticSearch datacontext.
+ * @param client
+ * The ElasticSearch client
+ * @return a DataContext object that matches the request
+ */
+ public static QueryPostprocessDataContext createElasticSearchDataContext(Client client) {
+ return new ElasticSearchDataContext(client);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/b65f2e12/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 879ceb8..b4618f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,7 @@
<module>json</module>
<module>xml</module>
<module>jdbc</module>
+ <module>elasticsearch</module>
<module>hbase</module>
<module>mongodb</module>
<module>couchdb</module>
@@ -348,6 +349,9 @@
<exclude>**/.settings/**</exclude>
<exclude>**/.travis.yml</exclude>
<exclude>**/target/**</exclude>
+ <exclude>**/*.iml/**</exclude>
+ <exclude>**/*.iws/**</exclude>
+ <exclude>**/*.ipr/**</exclude>
<exclude>DEPENDENCIES</exclude>
<exclude>DISCLAIMER</exclude>
</excludes>
[2/2] git commit: Fixed unittest (failed on Java 8 because of
non-deterministic ES sorting)
Posted by ka...@apache.org.
Fixed unittest (failed on Java 8 because of non-deterministic ES
sorting)
Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/55a752fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/55a752fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/55a752fe
Branch: refs/heads/master
Commit: 55a752feb90d45754848aea8ecef518bfe83e3ca
Parents: b65f2e1
Author: Kasper Sørensen <i....@gmail.com>
Authored: Sat Sep 20 12:25:18 2014 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Sat Sep 20 12:25:18 2014 +0200
----------------------------------------------------------------------
.../ElasticSearchDataContextTest.java | 76 +++++++++-----------
1 file changed, 34 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/55a752fe/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
index cd98d08..6fcc942 100644
--- a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
+++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
@@ -19,6 +19,7 @@
package org.apache.metamodel.elasticsearch;
import junit.framework.TestCase;
+
import org.apache.metamodel.DataContext;
import org.apache.metamodel.data.DataSet;
import org.apache.metamodel.data.DataSetTableModel;
@@ -34,6 +35,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentBuilder;
import javax.swing.table.TableModel;
+
import java.util.Arrays;
import java.util.Date;
import java.util.List;
@@ -83,7 +85,8 @@ public class ElasticSearchDataContextTest extends TestCase {
}
public void testSimpleQuery() throws Exception {
- assertEquals("[peopletype, bulktype, tweet1, tweet2]", Arrays.toString(dataContext.getDefaultSchema().getTableNames()));
+ assertEquals("[peopletype, bulktype, tweet1, tweet2]",
+ Arrays.toString(dataContext.getDefaultSchema().getTableNames()));
Table table = dataContext.getDefaultSchema().getTableByName("tweet1");
@@ -97,21 +100,20 @@ public class ElasticSearchDataContextTest extends TestCase {
try {
assertTrue(ds.next());
- assertEquals("Row[values=[user1, 1]]",
- ds.getRow().toString());
+ assertEquals("Row[values=[user1, 1]]", ds.getRow().toString());
} finally {
- //ds.close();
+ // ds.close();
}
}
public void testWhereColumnEqualsValues() throws Exception {
- DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user").isEquals("user4").execute();
+ DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+ .isEquals("user4").execute();
assertEquals(FilteredDataSet.class, ds.getClass());
try {
assertTrue(ds.next());
- assertEquals("Row[values=[user4, 4]]",
- ds.getRow().toString());
+ assertEquals("Row[values=[user4, 4]]", ds.getRow().toString());
assertFalse(ds.next());
} finally {
ds.close();
@@ -119,16 +121,19 @@ public class ElasticSearchDataContextTest extends TestCase {
}
public void testWhereColumnInValues() throws Exception {
- DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user").in("user4", "user5").execute();
- assertEquals(FilteredDataSet.class, ds.getClass());
+ DataSet ds = dataContext.query().from(bulkIndexType).select("user").and("message").where("user")
+ .in("user4", "user5").orderBy("message").execute();
try {
assertTrue(ds.next());
- assertEquals("Row[values=[user4, 4]]",
- ds.getRow().toString());
+
+ String row1 = ds.getRow().toString();
+ assertEquals("Row[values=[user4, 4]]", row1);
assertTrue(ds.next());
- assertEquals("Row[values=[user5, 5]]",
- ds.getRow().toString());
+
+ String row2 = ds.getRow().toString();
+ assertEquals("Row[values=[user5, 5]]", row2);
+
assertFalse(ds.next());
} finally {
ds.close();
@@ -144,7 +149,7 @@ public class ElasticSearchDataContextTest extends TestCase {
q.select(new SelectItem(table.getColumnByName("gender")),
new SelectItem(FunctionType.MAX, table.getColumnByName("age")),
new SelectItem(FunctionType.MIN, table.getColumnByName("age")), new SelectItem(FunctionType.COUNT, "*",
- "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId"));
+ "total"), new SelectItem(FunctionType.MIN, table.getColumnByName("id")).setAlias("firstId"));
DataSet data = dataContext.executeQuery(q);
assertEquals(
"[peopletype.gender, MAX(peopletype.age), MIN(peopletype.age), COUNT(*) AS total, MIN(peopletype.id) AS firstId]",
@@ -196,10 +201,10 @@ public class ElasticSearchDataContextTest extends TestCase {
boolean thrown = false;
try {
dataContext.query().from("nonExistingTable").select("user").and("message").execute();
- } catch(IllegalArgumentException IAex) {
+ } catch (IllegalArgumentException IAex) {
thrown = true;
} finally {
- //ds.close();
+ // ds.close();
}
assertTrue(thrown);
}
@@ -209,37 +214,32 @@ public class ElasticSearchDataContextTest extends TestCase {
boolean thrown = false;
try {
dataContext.query().from(indexType1).select("nonExistingField").execute();
- } catch(IllegalArgumentException IAex) {
+ } catch (IllegalArgumentException IAex) {
thrown = true;
} finally {
- //ds.close();
+ // ds.close();
}
assertTrue(thrown);
}
-
-
private void indexBulkDocuments(String indexName, String indexType, int numberOfDocuments) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
try {
- for (int i = 0; i < numberOfDocuments; i++) {
- bulkRequest.add(client.prepareIndex(indexName, indexType, new Integer(i).toString())
- .setSource(buildTweeterJson(i)));
- }
- bulkRequest.execute().actionGet();
+ for (int i = 0; i < numberOfDocuments; i++) {
+ bulkRequest.add(client.prepareIndex(indexName, indexType, new Integer(i).toString()).setSource(
+ buildTweeterJson(i)));
+ }
+ bulkRequest.execute().actionGet();
} catch (Exception ex) {
- System.out.println("Exception indexing documents!!!!!");
+ System.out.println("Exception indexing documents!!!!!");
}
}
private void indexOneTweeterDocumentPerIndex(String indexType, int id) {
try {
- client.prepareIndex(indexName, indexType)
- .setSource(buildTweeterJson(id))
- .execute()
- .actionGet();
+ client.prepareIndex(indexName, indexType).setSource(buildTweeterJson(id)).execute().actionGet();
} catch (Exception ex) {
System.out.println("Exception indexing documents!!!!!");
}
@@ -247,9 +247,7 @@ public class ElasticSearchDataContextTest extends TestCase {
private void indexOnePeopleDocument(String gender, int age, int id) {
try {
- client.prepareIndex(peopleIndexName, peopleIndexType)
- .setSource(buildPeopleJson(gender, age, id))
- .execute()
+ client.prepareIndex(peopleIndexName, peopleIndexType).setSource(buildPeopleJson(gender, age, id)).execute()
.actionGet();
} catch (Exception ex) {
System.out.println("Exception indexing documents!!!!!");
@@ -257,18 +255,12 @@ public class ElasticSearchDataContextTest extends TestCase {
}
private XContentBuilder buildTweeterJson(int elementId) throws Exception {
- return jsonBuilder().startObject().field("user", "user" + elementId)
- .field("postDate", new Date())
- .field("message", elementId)
- .endObject();
+ return jsonBuilder().startObject().field("user", "user" + elementId).field("postDate", new Date())
+ .field("message", elementId).endObject();
}
private XContentBuilder buildPeopleJson(String gender, int age, int elementId) throws Exception {
- return jsonBuilder().startObject().field("gender", gender)
- .field("age", age)
- .field("id", elementId)
- .endObject();
+ return jsonBuilder().startObject().field("gender", gender).field("age", age).field("id", elementId).endObject();
}
-
}
\ No newline at end of file