You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:51:52 UTC

[08/50] [abbrv] ignite git commit: ignite-split2 - fixes

ignite-split2 - fixes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/750f146b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/750f146b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/750f146b

Branch: refs/heads/ignite-1232
Commit: 750f146be9a032c3dbef0c4919592e57762b9f98
Parents: d86e0ae
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Dec 7 03:01:25 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Dec 7 03:01:25 2015 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2TreeIndex.java           | 158 ++++++++++++-------
 .../h2/twostep/GridReduceQueryExecutor.java     |  13 +-
 2 files changed, 106 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/750f146b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 633cdf4..04fd233 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -54,12 +54,14 @@ import org.apache.ignite.internal.util.GridEmptyIterator;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.snaptree.SnapTreeMap;
+import org.apache.ignite.internal.util.typedef.CIX2;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.engine.Session;
 import org.h2.index.Cursor;
@@ -78,8 +80,12 @@ import org.h2.value.ValueNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
 import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
 import static org.h2.result.Row.MEMORY_CALCULATE;
 
@@ -106,6 +112,13 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     /** */
     private final boolean snapshotEnabled;
 
+    /** */
+    private final CIX2<ClusterNode,Message> locNodeHandler = new CIX2<ClusterNode,Message>() {
+        @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException {
+            onMessage0(clusterNode.id(), msg);
+        }
+    };
+
     /**
      * Constructor with index initialization.
      *
@@ -195,23 +208,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             msgLsnr = new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
-                    ClusterNode node = kernalContext().discovery().node(nodeId);
-
-                    if (node == null)
-                        return;
-
-                    try {
-                        if (msg instanceof GridH2IndexRangeRequest)
-                            onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
-                        else if (msg instanceof GridH2IndexRangeResponse)
-                            onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
-                    }
-                    catch (Throwable th) {
-                        U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
-
-                        if (th instanceof Error)
-                            throw th;
-                    }
+                    onMessage0(nodeId, msg);
                 }
             };
 
@@ -225,6 +222,39 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
     }
 
     /**
+     * @param nodes Nodes.
+     * @param msg Message.
+     */
+    private void send(Collection<ClusterNode> nodes, Message msg) {
+        if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler))
+            throw new CacheException("Failed to send message to nodes: " + nodes + ".");
+    }
+
+    /**
+     * @param nodeId Source node ID.
+     * @param msg Message.
+     */
+    private void onMessage0(UUID nodeId, Object msg) {
+        ClusterNode node = kernalContext().discovery().node(nodeId);
+
+        if (node == null)
+            return;
+
+        try {
+            if (msg instanceof GridH2IndexRangeRequest)
+                onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
+            else if (msg instanceof GridH2IndexRangeResponse)
+                onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
+        }
+        catch (Throwable th) {
+            U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
+
+            if (th instanceof Error)
+                throw th;
+        }
+    }
+
+    /**
      * @return Kernal context.
      */
     private GridKernalContext kernalContext() {
@@ -239,60 +269,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
             msg.originNodeId(), msg.queryId(), MAP);
 
-        if (qctx == null) {
-            // TODO respond NOT_FOUND
+        GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
 
-            return;
-        }
+        res.originNodeId(msg.originNodeId());
+        res.queryId(msg.queryId());
+        res.batchLookupId(msg.batchLookupId());
 
-        RangeSource src;
+        if (qctx == null)
+            res.status(STATUS_NOT_FOUND);
+        else {
+            try {
+                RangeSource src;
 
-        if (msg.bounds() != null) {
-            // This is the first request containing all the search rows.
-            ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
+                if (msg.bounds() != null) {
+                    // This is the first request containing all the search rows.
+                    ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
 
-            src = new RangeSource(msg.bounds(), snapshot0);
-        }
-        else {
-            // This is request to fetch next portion of data.
-            src = qctx.getSource(node.id(), msg.batchLookupId());
+                    src = new RangeSource(msg.bounds(), snapshot0);
+                }
+                else {
+                    // This is request to fetch next portion of data.
+                    src = qctx.getSource(node.id(), msg.batchLookupId());
 
-            assert src != null;
-        }
+                    assert src != null;
+                }
 
-        List<GridH2RowRange> ranges = new ArrayList<>();
+                List<GridH2RowRange> ranges = new ArrayList<>();
 
-        int maxRows = qctx.pageSize();
+                int maxRows = qctx.pageSize();
 
-        while (maxRows > 0) {
-            GridH2RowRange range = src.next(maxRows);
+                while (maxRows > 0) {
+                    GridH2RowRange range = src.next(maxRows);
 
-            if (range == null)
-                break;
+                    if (range == null)
+                        break;
 
-            maxRows -= range.rows().size();
-        }
+                    maxRows -= range.rows().size();
+                }
 
-        if (src.hasMoreRows()) {
-            // Save source for future fetches.
-            if (msg.bounds() != null)
-                qctx.putSource(node.id(), msg.batchLookupId(), src);
-        }
-        else if (msg.bounds() == null) {
-            // Drop saved source.
-            qctx.putSource(node.id(), msg.batchLookupId(), null);
-        }
+                if (src.hasMoreRows()) {
+                    // Save source for future fetches.
+                    if (msg.bounds() != null)
+                        qctx.putSource(node.id(), msg.batchLookupId(), src);
+                }
+                else if (msg.bounds() == null) {
+                    // Drop saved source.
+                    qctx.putSource(node.id(), msg.batchLookupId(), null);
+                }
 
-        assert !ranges.isEmpty();
+                assert !ranges.isEmpty();
 
-        GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
+                res.ranges(ranges);
+                res.status(STATUS_OK);
+            }
+            catch (Throwable th) {
+                U.error(log, "Failed to process request: " + msg, th);
 
-        res.originNodeId(msg.originNodeId());
-        res.queryId(msg.queryId());
-        res.batchLookupId(msg.batchLookupId());
-        res.ranges(ranges);
+                res.error(th.getClass() + ": " + th.getMessage());
+                res.status(STATUS_ERROR);
+            }
+        }
 
-        // TODO send res
+        send(singletonList(node), res);
     }
 
     /**
@@ -1327,7 +1365,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             assert remainingRanges > 0;
 
-            // TODO send req
+            send(singletonList(node), req);
         }
 
         /**
@@ -1367,14 +1405,14 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                                 if (req.bounds() != null)
                                     req = createRequest(qctx, req.batchLookupId());
 
-                                // TODO request next by sending req
+                                send(singletonList(node), req);
                             }
                             else
                                 req = null;
 
                             return res;
 
-                        case GridH2IndexRangeResponse.STATUS_NOT_FOUND:
+                        case STATUS_NOT_FOUND:
                             if (req == null || req.bounds() == null) // We have already received the first response.
                                 throw new GridH2RetryException("Failure on remote node.");
 
@@ -1385,7 +1423,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                                 throw new IgniteInterruptedException(e.getMessage());
                             }
 
-                            // TODO resend req
+                            send(singletonList(node), req);
 
                             break;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/750f146b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3a8b8cb..134631c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -159,6 +159,13 @@ public class GridReduceQueryExecutor {
     /** */
     private final GridSpinBusyLock busyLock;
 
+    /** */
+    private final CIX2<ClusterNode,Message> locNodeHandler = new CIX2<ClusterNode,Message>() {
+        @Override public void applyx(ClusterNode locNode, Message msg) {
+            h2.mapQueryExecutor().onMessage(locNode.id(), msg);
+        }
+    };
+
     /**
      * @param busyLock Busy lock.
      */
@@ -1119,11 +1126,7 @@ public class GridReduceQueryExecutor {
         Message msg,
         @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize
     ) {
-        return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, new CIX2<ClusterNode,Message>() {
-            @Override public void applyx(ClusterNode locNode, Message msg) {
-                h2.mapQueryExecutor().onMessage(locNode.id(), msg);
-            }
-        });
+        return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHandler);
     }
 
     /**