You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2014/09/30 20:25:27 UTC
[1/2] git commit: METAMODEL-80: Fixed
Repository: incubator-metamodel
Updated Branches:
refs/heads/master d6e8d5422 -> 560dd6600
METAMODEL-80: Fixed
Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/f825b29c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/f825b29c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/f825b29c
Branch: refs/heads/master
Commit: f825b29c6ce28d6ecffd7b565ab87fb745a7b1a4
Parents: cdd037a
Author: Kasper Sørensen <i....@gmail.com>
Authored: Mon Sep 29 22:28:26 2014 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Mon Sep 29 22:28:26 2014 +0200
----------------------------------------------------------------------
.../elasticsearch/ElasticSearchDataContext.java | 8 ++-
.../elasticsearch/ElasticSearchDataSet.java | 75 +++++++++++++-------
.../ElasticSearchDataContextTest.java | 1 -
3 files changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/f825b29c/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
index 9db94fd..a7b00c9 100644
--- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
@@ -49,6 +49,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.hppc.ObjectLookupContainer;
import org.elasticsearch.common.hppc.cursors.ObjectCursor;
+import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +75,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
+ public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
+
private final Client elasticSearchClient;
private final SimpleTableDef[] tableDefs;
private final String indexName;
@@ -215,9 +218,12 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
final SearchRequestBuilder requestBuilder = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
if (limitMaxRowsIsSet(maxRows)) {
requestBuilder.setSize(maxRows);
+ } else {
+ requestBuilder.setScroll(TIMEOUT_SCROLL);
}
+
final SearchResponse response = requestBuilder.execute().actionGet();
- return new ElasticSearchDataSet(response, columns, false);
+ return new ElasticSearchDataSet(elasticSearchClient, response, columns, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/f825b29c/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
index 5187b3c..c7e5fe9 100644
--- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
@@ -18,52 +18,55 @@
*/
package org.apache.metamodel.elasticsearch;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.metamodel.data.AbstractDataSet;
import org.apache.metamodel.data.DefaultRow;
import org.apache.metamodel.data.Row;
import org.apache.metamodel.query.SelectItem;
import org.apache.metamodel.schema.Column;
+import org.elasticsearch.action.search.ClearScrollRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
import org.elasticsearch.search.SearchHit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
final class ElasticSearchDataSet extends AbstractDataSet {
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class);
- private int readCount = 0;
+ private final Client _client;
+ private final AtomicBoolean _closed;
- private final SearchHit[] _cursor;
- private final boolean _queryPostProcessed;
+ private SearchResponse _searchResponse;
+ private SearchHit _currentHit;
+ private int _hitIndex = 0;
- private boolean _closed;
- private volatile SearchHit _dbObject;
-
- public ElasticSearchDataSet(SearchResponse cursor, Column[] columns, boolean queryPostProcessed) {
+ public ElasticSearchDataSet(Client client, SearchResponse searchResponse, Column[] columns,
+ boolean queryPostProcessed) {
super(columns);
- _cursor = cursor.getHits().hits();
- _queryPostProcessed = queryPostProcessed;
- _closed = false;
- }
-
- public boolean isQueryPostProcessed() {
- return _queryPostProcessed;
+ _client = client;
+ _searchResponse = searchResponse;
+ _closed = new AtomicBoolean(false);
}
@Override
public void close() {
super.close();
- // _cursor.close();
- _closed = true;
+ boolean closeNow = _closed.compareAndSet(true, false);
+ if (closeNow) {
+ ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client)
+ .addScrollId(_searchResponse.getScrollId());
+ scrollRequestBuilder.execute();
+ }
}
@Override
protected void finalize() throws Throwable {
super.finalize();
- if (!_closed) {
+ if (!_closed.get()) {
logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
close();
}
@@ -71,19 +74,39 @@ final class ElasticSearchDataSet extends AbstractDataSet {
@Override
public boolean next() {
- if (readCount < _cursor.length) {
- _dbObject = _cursor[readCount];
- readCount++;
+ final SearchHit[] hits = _searchResponse.getHits().hits();
+ if (hits.length == 0) {
+ // break condition for the scroll
+ _currentHit = null;
+ return false;
+ }
+
+ if (_hitIndex < hits.length) {
+ // pick the next hit within this search response
+ _currentHit = hits[_hitIndex];
+ _hitIndex++;
return true;
- } else {
- _dbObject = null;
+ }
+
+ final String scrollId = _searchResponse.getScrollId();
+ if (scrollId == null) {
+ // this search response is not scrolleable - then it's the end.
+ _currentHit = null;
return false;
}
+
+ // try to scroll to the next set of hits
+ _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+ .execute().actionGet();
+
+ // start over (recursively)
+ _hitIndex = 0;
+ return next();
}
@Override
public Row getRow() {
- if (_dbObject == null) {
+ if (_currentHit == null) {
return null;
}
@@ -91,7 +114,7 @@ final class ElasticSearchDataSet extends AbstractDataSet {
final Object[] values = new Object[size];
for (int i = 0; i < values.length; i++) {
final SelectItem selectItem = getHeader().getSelectItem(i);
- final Map<String, Object> element = _dbObject.getSource();
+ final Map<String, Object> element = _currentHit.getSource();
final String key = selectItem.getColumn().getName();
values[i] = element.get(key);
}
http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/f825b29c/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
index 5435f58..7d3941e 100644
--- a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
+++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
@@ -102,7 +102,6 @@ public class ElasticSearchDataContextTest {
DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute();
assertEquals(ElasticSearchDataSet.class, ds.getClass());
- assertFalse(((ElasticSearchDataSet) ds).isQueryPostProcessed());
try {
assertTrue(ds.next());
[2/2] git commit: Merge branch 'master' of
https://git-wip-us.apache.org/repos/asf/incubator-metamodel.git
Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-metamodel.git
Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/560dd660
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/560dd660
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/560dd660
Branch: refs/heads/master
Commit: 560dd6600b7b97872e685a05e3c41eaef4d8a31e
Parents: f825b29 d6e8d54
Author: Kasper Sørensen <i....@gmail.com>
Authored: Tue Sep 30 20:23:50 2014 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Tue Sep 30 20:23:50 2014 +0200
----------------------------------------------------------------------
CHANGES.md | 1 +
.../org/apache/metamodel/query/FilterItem.java | 13 ++++-----
.../apache/metamodel/query/OperatorType.java | 30 ++++++++++----------
.../java/org/apache/metamodel/query/Query.java | 10 +++++--
.../QueryPostprocessDataContextTest.java | 15 ++++++++++
.../metamodel/mongodb/MongoDbDataContext.java | 3 --
6 files changed, 44 insertions(+), 28 deletions(-)
----------------------------------------------------------------------