You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/08/14 10:30:12 UTC

[02/50] ignite git commit: Consuming results into own target.

Consuming results into own target.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3a4c2d75
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3a4c2d75
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3a4c2d75

Branch: refs/heads/ignite-5991-6019
Commit: 3a4c2d75c61a64accd9059c5cecf46267cff8b4e
Parents: 8efa036
Author: devozerov <vo...@gridgain.com>
Authored: Wed Aug 9 12:38:04 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Aug 9 12:38:04 2017 +0300

----------------------------------------------------------------------
 .../ignite/examples/datagrid/TestExample.java   |  2 +-
 .../processors/query/h2/IgniteH2Indexing.java   | 44 ++++++++++++++++++++
 .../query/h2/twostep/GridMapQueryExecutor.java  |  6 +++
 3 files changed, 51 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3a4c2d75/examples/src/main/java/org/apache/ignite/examples/datagrid/TestExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/TestExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/TestExample.java
index b1df756..3645d43 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/TestExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/TestExample.java
@@ -25,7 +25,7 @@ public class TestExample {
                  Ignition.start(new IgniteConfiguration().setIgniteInstanceName("client").setClientMode(true))) {
                 IgniteCache<Long, Person> cliCache = cli.cache(CACHE_NAME);
 
-                SqlFieldsQuery qry = new SqlFieldsQuery("SELECT firstName FROM Person");
+                SqlFieldsQuery qry = new SqlFieldsQuery("SELECT firstName FROM Person").setStreaming(true);
 
                 int cnt = 0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a4c2d75/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 3e8993b..68551f4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -137,16 +137,21 @@ import org.apache.ignite.resources.LoggerResource;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.api.ErrorCode;
 import org.h2.api.JavaObjectSerializer;
+import org.h2.command.CommandContainer;
+import org.h2.command.CommandInterface;
 import org.h2.command.Prepared;
 import org.h2.command.dml.Insert;
+import org.h2.command.dml.Select;
 import org.h2.engine.SysProperties;
 import org.h2.index.Index;
 import org.h2.jdbc.JdbcPreparedStatement;
 import org.h2.jdbc.JdbcStatement;
+import org.h2.result.ResultTarget;
 import org.h2.server.web.WebServer;
 import org.h2.table.IndexColumn;
 import org.h2.tools.Server;
 import org.h2.util.JdbcUtils;
+import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -958,6 +963,45 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Execute SQL in streaming mode, trying to avoid loading everything to memory.
+     *
+     * @param conn Connection.
+     * @param sql SQL statement.
+     * @param params Parameters.
+     * @throws IgniteCheckedException If failed.
+     */
+    public void executeSqlStreaming(Connection conn, String sql, @Nullable Collection<Object> params)
+        throws IgniteCheckedException {
+        JdbcPreparedStatement stmt = (JdbcPreparedStatement)preparedStatementWithParams(conn, sql, params, false);
+
+        CommandContainer cmd = U.field(stmt, "command");
+
+        Select select = U.field(cmd, "prepared");
+
+        select.query(0, new IgniteResultTarget());
+    }
+
+    /**
+     * Hacked result target.
+     */
+    private static class IgniteResultTarget implements ResultTarget {
+        /** Row count. */
+        private int rowCnt;
+
+        /** {@inheritDoc} */
+        @Override public void addRow(Value[] values) {
+            System.out.println("ADD ROW: " + Arrays.toString(values));
+
+            rowCnt++;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int getRowCount() {
+            return rowCnt;
+        }
+    }
+
+    /**
      * Executes sql query and prints warning if query is too slow.
      *
      * @param stmt Prepared statement for query.

http://git-wip-us.apache.org/repos/asf/ignite/blob/3a4c2d75/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 46d403e..b6227f8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -567,6 +567,12 @@ public class GridMapQueryExecutor {
                 }
 
                 // Run queries.
+                if (streaming) {
+                    GridCacheSqlQuery qry = qrys.iterator().next();
+
+                    h2.executeSqlStreaming(conn, qry.query(), F.asList(qry.parameters(params)));
+                }
+
                 int qryIdx = 0;
 
                 boolean evt = mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);