You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/08/04 20:05:22 UTC

incubator-ignite git commit: master - query restart tests fix2

Repository: incubator-ignite
Updated Branches:
  refs/heads/master 90adeae9d -> 246b94a8b


master - query restart tests fix2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/246b94a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/246b94a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/246b94a8

Branch: refs/heads/master
Commit: 246b94a8bdc9901935db1865a0607a9fe48f5b23
Parents: 90adeae
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Aug 4 21:05:13 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Aug 4 21:05:13 2015 +0300

----------------------------------------------------------------------
 .../query/h2/twostep/GridMergeIndex.java        |  7 +++
 .../h2/twostep/GridMergeIndexUnsorted.java      | 23 +++++++--
 .../query/h2/twostep/GridMergeTable.java        | 51 ++++++++------------
 .../h2/twostep/GridReduceQueryExecutor.java     | 28 +----------
 4 files changed, 45 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
index 2b2996d..71b207d 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndex.java
@@ -68,6 +68,13 @@ public abstract class GridMergeIndex extends BaseIndex {
     }
 
     /**
+     * @return Return source nodes for this merge index.
+     */
+    public Set<UUID> sources() {
+        return remainingRows.keySet();
+    }
+
+    /**
      * @param nodeId Node ID.
      * @return {@code true} If this index needs data from the given source node.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
index e0a07ec..276d25b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeIndexUnsorted.java
@@ -64,11 +64,24 @@ public class GridMergeIndexUnsorted extends GridMergeIndex {
                 while (!iter.hasNext()) {
                     GridResultPage page;
 
-                    try {
-                        page = queue.take();
-                    }
-                    catch (InterruptedException e) {
-                        throw new CacheException("Query execution was interrupted.", e);
+                    for (;;) {
+                        try {
+                            page = queue.poll(500, TimeUnit.MILLISECONDS);
+                        }
+                        catch (InterruptedException e) {
+                            throw new CacheException("Query execution was interrupted.", e);
+                        }
+
+                        if (page != null)
+                            break;
+
+                        UUID nodeId = ((GridMergeTable)table).checkSourceNodesAlive();
+
+                        if (nodeId != null) {
+                            fail(nodeId);
+
+                            assert !queue.isEmpty();
+                        }
                     }
 
                     if (page.isLast())

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
index c9cdff2..fd9eec3 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMergeTable.java
@@ -17,7 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
-import org.h2.api.*;
+import org.apache.ignite.internal.*;
 import org.h2.command.ddl.*;
 import org.h2.engine.*;
 import org.h2.index.*;
@@ -32,6 +32,9 @@ import java.util.*;
  */
 public class GridMergeTable extends TableBase {
     /** */
+    private final GridKernalContext ctx;
+
+    /** */
     private final ArrayList<Index> idxs = new ArrayList<>(1);
 
     /** */
@@ -39,15 +42,29 @@ public class GridMergeTable extends TableBase {
 
     /**
      * @param data Data.
+     * @param ctx Kernal context.
      */
-    public GridMergeTable(CreateTableData data) {
+    public GridMergeTable(CreateTableData data, GridKernalContext ctx) {
         super(data);
 
+        this.ctx = ctx;
         idx = new GridMergeIndexUnsorted(this, "merge_scan");
 
         idxs.add(idx);
     }
 
+    /**
+     * @return Failed node or {@code null} if all alive.
+     */
+    public UUID checkSourceNodesAlive() {
+        for (UUID nodeId : idx.sources()) {
+            if (!ctx.discovery().alive(nodeId))
+                return nodeId;
+        }
+
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override public void lock(Session session, boolean exclusive, boolean force) {
         // No-op.
@@ -153,34 +170,4 @@ public class GridMergeTable extends TableBase {
     @Override public void checkRename() {
         throw DbException.getUnsupportedException("rename");
     }
-
-    /**
-     * Engine.
-     */
-    public static class Engine implements TableEngine {
-        /** */
-        private static ThreadLocal<GridMergeTable> createdTbl = new ThreadLocal<>();
-
-        /**
-         * @return Created table.
-         */
-        public static GridMergeTable getCreated() {
-            GridMergeTable tbl = createdTbl.get();
-
-            assert tbl != null;
-
-            createdTbl.remove();
-
-            return tbl;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Table createTable(CreateTableData data) {
-            GridMergeTable tbl = new GridMergeTable(data);
-
-            createdTbl.set(tbl);
-
-            return tbl;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/246b94a8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index ac269db..ad8ab34 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -1096,7 +1096,7 @@ public class GridReduceQueryExecutor {
             else
                 data.columns = planColumns();
 
-            return new GridMergeTable(data);
+            return new GridMergeTable(data, ctx);
         }
         catch (Exception e) {
             U.closeQuiet(conn);
@@ -1117,32 +1117,6 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param conn Connection.
-     * @param qry Query.
-     * @return Table.
-     * @throws IgniteCheckedException If failed.
-     */
-    private GridMergeTable createTable(Connection conn, GridCacheSqlQuery qry) throws IgniteCheckedException {
-        try {
-            try (PreparedStatement s = conn.prepareStatement(
-                "CREATE LOCAL TEMPORARY TABLE " + qry.alias() +
-                " ENGINE \"" + GridMergeTable.Engine.class.getName() + "\" " +
-                " AS SELECT * FROM (" + qry.query() + ") WHERE FALSE")) {
-                h2.bindParameters(s, F.asList(qry.parameters()));
-
-                s.execute();
-            }
-
-            return GridMergeTable.Engine.getCreated();
-        }
-        catch (SQLException e) {
-            U.closeQuiet(conn);
-
-            throw new IgniteCheckedException(e);
-        }
-    }
-
-    /**
      * @param reconnectFut Reconnect future.
      */
     public void onDisconnected(IgniteFuture<?> reconnectFut) {