You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metamodel.apache.org by ka...@apache.org on 2014/09/30 20:25:27 UTC

[1/2] git commit: METAMODEL-80: Fixed

Repository: incubator-metamodel
Updated Branches:
  refs/heads/master d6e8d5422 -> 560dd6600


METAMODEL-80: Fixed

Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/f825b29c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/f825b29c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/f825b29c

Branch: refs/heads/master
Commit: f825b29c6ce28d6ecffd7b565ab87fb745a7b1a4
Parents: cdd037a
Author: Kasper Sørensen <i....@gmail.com>
Authored: Mon Sep 29 22:28:26 2014 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Mon Sep 29 22:28:26 2014 +0200

----------------------------------------------------------------------
 .../elasticsearch/ElasticSearchDataContext.java |  8 ++-
 .../elasticsearch/ElasticSearchDataSet.java     | 75 +++++++++++++-------
 .../ElasticSearchDataContextTest.java           |  1 -
 3 files changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/f825b29c/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
index 9db94fd..a7b00c9 100644
--- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContext.java
@@ -49,6 +49,7 @@ import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.hppc.ObjectLookupContainer;
 import org.elasticsearch.common.hppc.cursors.ObjectCursor;
+import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +75,8 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataContext.class);
 
+    public static final TimeValue TIMEOUT_SCROLL = TimeValue.timeValueSeconds(60);
+
     private final Client elasticSearchClient;
     private final SimpleTableDef[] tableDefs;
     private final String indexName;
@@ -215,9 +218,12 @@ public class ElasticSearchDataContext extends QueryPostprocessDataContext implem
         final SearchRequestBuilder requestBuilder = elasticSearchClient.prepareSearch(indexName).setTypes(documentType);
         if (limitMaxRowsIsSet(maxRows)) {
             requestBuilder.setSize(maxRows);
+        } else {
+            requestBuilder.setScroll(TIMEOUT_SCROLL);
         }
+
         final SearchResponse response = requestBuilder.execute().actionGet();
-        return new ElasticSearchDataSet(response, columns, false);
+        return new ElasticSearchDataSet(elasticSearchClient, response, columns, false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/f825b29c/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
index 5187b3c..c7e5fe9 100644
--- a/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
+++ b/elasticsearch/src/main/java/org/apache/metamodel/elasticsearch/ElasticSearchDataSet.java
@@ -18,52 +18,55 @@
  */
 package org.apache.metamodel.elasticsearch;
 
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.metamodel.data.AbstractDataSet;
 import org.apache.metamodel.data.DefaultRow;
 import org.apache.metamodel.data.Row;
 import org.apache.metamodel.query.SelectItem;
 import org.apache.metamodel.schema.Column;
+import org.elasticsearch.action.search.ClearScrollRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.search.SearchHit;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-
 final class ElasticSearchDataSet extends AbstractDataSet {
 
     private static final Logger logger = LoggerFactory.getLogger(ElasticSearchDataSet.class);
 
-    private int readCount = 0;
+    private final Client _client;
+    private final AtomicBoolean _closed;
 
-    private final SearchHit[] _cursor;
-    private final boolean _queryPostProcessed;
+    private SearchResponse _searchResponse;
+    private SearchHit _currentHit;
+    private int _hitIndex = 0;
 
-    private boolean _closed;
-    private volatile SearchHit _dbObject;
-
-    public ElasticSearchDataSet(SearchResponse cursor, Column[] columns, boolean queryPostProcessed) {
+    public ElasticSearchDataSet(Client client, SearchResponse searchResponse, Column[] columns,
+            boolean queryPostProcessed) {
         super(columns);
-        _cursor = cursor.getHits().hits();
-        _queryPostProcessed = queryPostProcessed;
-        _closed = false;
-    }
-
-    public boolean isQueryPostProcessed() {
-        return _queryPostProcessed;
+        _client = client;
+        _searchResponse = searchResponse;
+        _closed = new AtomicBoolean(false);
     }
 
     @Override
     public void close() {
         super.close();
-        // _cursor.close();
-        _closed = true;
+        boolean closeNow = _closed.compareAndSet(true, false);
+        if (closeNow) {
+            ClearScrollRequestBuilder scrollRequestBuilder = new ClearScrollRequestBuilder(_client)
+                    .addScrollId(_searchResponse.getScrollId());
+            scrollRequestBuilder.execute();
+        }
     }
 
     @Override
     protected void finalize() throws Throwable {
         super.finalize();
-        if (!_closed) {
+        if (!_closed.get()) {
             logger.warn("finalize() invoked, but DataSet is not closed. Invoking close() on {}", this);
             close();
         }
@@ -71,19 +74,39 @@ final class ElasticSearchDataSet extends AbstractDataSet {
 
     @Override
     public boolean next() {
-        if (readCount < _cursor.length) {
-            _dbObject = _cursor[readCount];
-            readCount++;
+        final SearchHit[] hits = _searchResponse.getHits().hits();
+        if (hits.length == 0) {
+            // break condition for the scroll
+            _currentHit = null;
+            return false;
+        }
+
+        if (_hitIndex < hits.length) {
+            // pick the next hit within this search response
+            _currentHit = hits[_hitIndex];
+            _hitIndex++;
             return true;
-        } else {
-            _dbObject = null;
+        }
+
+        final String scrollId = _searchResponse.getScrollId();
+        if (scrollId == null) {
+            // this search response is not scrolleable - then it's the end.
+            _currentHit = null;
             return false;
         }
+
+        // try to scroll to the next set of hits
+        _searchResponse = _client.prepareSearchScroll(scrollId).setScroll(ElasticSearchDataContext.TIMEOUT_SCROLL)
+                .execute().actionGet();
+
+        // start over (recursively)
+        _hitIndex = 0;
+        return next();
     }
 
     @Override
     public Row getRow() {
-        if (_dbObject == null) {
+        if (_currentHit == null) {
             return null;
         }
 
@@ -91,7 +114,7 @@ final class ElasticSearchDataSet extends AbstractDataSet {
         final Object[] values = new Object[size];
         for (int i = 0; i < values.length; i++) {
             final SelectItem selectItem = getHeader().getSelectItem(i);
-            final Map<String, Object> element = _dbObject.getSource();
+            final Map<String, Object> element = _currentHit.getSource();
             final String key = selectItem.getColumn().getName();
             values[i] = element.get(key);
         }

http://git-wip-us.apache.org/repos/asf/incubator-metamodel/blob/f825b29c/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
----------------------------------------------------------------------
diff --git a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
index 5435f58..7d3941e 100644
--- a/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
+++ b/elasticsearch/src/test/java/org/apache/metamodel/elasticsearch/ElasticSearchDataContextTest.java
@@ -102,7 +102,6 @@ public class ElasticSearchDataContextTest {
 
         DataSet ds = dataContext.query().from(indexType1).select("user").and("message").execute();
         assertEquals(ElasticSearchDataSet.class, ds.getClass());
-        assertFalse(((ElasticSearchDataSet) ds).isQueryPostProcessed());
 
         try {
             assertTrue(ds.next());


[2/2] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-metamodel.git

Posted by ka...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-metamodel.git

Project: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/commit/560dd660
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/tree/560dd660
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metamodel/diff/560dd660

Branch: refs/heads/master
Commit: 560dd6600b7b97872e685a05e3c41eaef4d8a31e
Parents: f825b29 d6e8d54
Author: Kasper Sørensen <i....@gmail.com>
Authored: Tue Sep 30 20:23:50 2014 +0200
Committer: Kasper Sørensen <i....@gmail.com>
Committed: Tue Sep 30 20:23:50 2014 +0200

----------------------------------------------------------------------
 CHANGES.md                                      |  1 +
 .../org/apache/metamodel/query/FilterItem.java  | 13 ++++-----
 .../apache/metamodel/query/OperatorType.java    | 30 ++++++++++----------
 .../java/org/apache/metamodel/query/Query.java  | 10 +++++--
 .../QueryPostprocessDataContextTest.java        | 15 ++++++++++
 .../metamodel/mongodb/MongoDbDataContext.java   |  3 --
 6 files changed, 44 insertions(+), 28 deletions(-)
----------------------------------------------------------------------