You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:51:52 UTC
[08/50] [abbrv] ignite git commit: ignite-split2 - fixes
ignite-split2 - fixes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/750f146b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/750f146b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/750f146b
Branch: refs/heads/ignite-1232
Commit: 750f146be9a032c3dbef0c4919592e57762b9f98
Parents: d86e0ae
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Dec 7 03:01:25 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Dec 7 03:01:25 2015 +0300
----------------------------------------------------------------------
.../query/h2/opt/GridH2TreeIndex.java | 158 ++++++++++++-------
.../h2/twostep/GridReduceQueryExecutor.java | 13 +-
2 files changed, 106 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/750f146b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index 633cdf4..04fd233 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -54,12 +54,14 @@ import org.apache.ignite.internal.util.GridEmptyIterator;
import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
import org.apache.ignite.internal.util.snaptree.SnapTreeMap;
+import org.apache.ignite.internal.util.typedef.CIX2;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.h2.engine.Session;
import org.h2.index.Cursor;
@@ -78,8 +80,12 @@ import org.h2.value.ValueNull;
import org.jetbrains.annotations.Nullable;
import static java.util.Collections.emptyIterator;
+import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.KEY_COL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_ERROR;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_NOT_FOUND;
+import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeResponse.STATUS_OK;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRangeBounds.rangeBounds;
import static org.h2.result.Row.MEMORY_CALCULATE;
@@ -106,6 +112,13 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
/** */
private final boolean snapshotEnabled;
+ /** */
+ private final CIX2<ClusterNode,Message> locNodeHandler = new CIX2<ClusterNode,Message>() {
+ @Override public void applyx(ClusterNode clusterNode, Message msg) throws IgniteCheckedException {
+ onMessage0(clusterNode.id(), msg);
+ }
+ };
+
/**
* Constructor with index initialization.
*
@@ -195,23 +208,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
msgLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
- ClusterNode node = kernalContext().discovery().node(nodeId);
-
- if (node == null)
- return;
-
- try {
- if (msg instanceof GridH2IndexRangeRequest)
- onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
- else if (msg instanceof GridH2IndexRangeResponse)
- onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
- }
- catch (Throwable th) {
- U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
-
- if (th instanceof Error)
- throw th;
- }
+ onMessage0(nodeId, msg);
}
};
@@ -225,6 +222,39 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
}
/**
+ * @param nodes Nodes.
+ * @param msg Message.
+ */
+ private void send(Collection<ClusterNode> nodes, Message msg) {
+ if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler))
+ throw new CacheException("Failed to send message to nodes: " + nodes + ".");
+ }
+
+ /**
+ * @param nodeId Source node ID.
+ * @param msg Message.
+ */
+ private void onMessage0(UUID nodeId, Object msg) {
+ ClusterNode node = kernalContext().discovery().node(nodeId);
+
+ if (node == null)
+ return;
+
+ try {
+ if (msg instanceof GridH2IndexRangeRequest)
+ onIndexRangeRequest(node, (GridH2IndexRangeRequest)msg);
+ else if (msg instanceof GridH2IndexRangeResponse)
+ onIndexRangeResponse(node, (GridH2IndexRangeResponse)msg);
+ }
+ catch (Throwable th) {
+ U.error(log, "Failed to handle message[nodeId=" + nodeId + ", msg=" + msg + "]", th);
+
+ if (th instanceof Error)
+ throw th;
+ }
+ }
+
+ /**
* @return Kernal context.
*/
private GridKernalContext kernalContext() {
@@ -239,60 +269,68 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
GridH2QueryContext qctx = GridH2QueryContext.get(kernalContext().localNodeId(),
msg.originNodeId(), msg.queryId(), MAP);
- if (qctx == null) {
- // TODO respond NOT_FOUND
+ GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
- return;
- }
+ res.originNodeId(msg.originNodeId());
+ res.queryId(msg.queryId());
+ res.batchLookupId(msg.batchLookupId());
- RangeSource src;
+ if (qctx == null)
+ res.status(STATUS_NOT_FOUND);
+ else {
+ try {
+ RangeSource src;
- if (msg.bounds() != null) {
- // This is the first request containing all the search rows.
- ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
+ if (msg.bounds() != null) {
+ // This is the first request containing all the search rows.
+ ConcurrentNavigableMap<GridSearchRowPointer,GridH2Row> snapshot0 = qctx.getSnapshot(idxId);
- src = new RangeSource(msg.bounds(), snapshot0);
- }
- else {
- // This is request to fetch next portion of data.
- src = qctx.getSource(node.id(), msg.batchLookupId());
+ src = new RangeSource(msg.bounds(), snapshot0);
+ }
+ else {
+ // This is request to fetch next portion of data.
+ src = qctx.getSource(node.id(), msg.batchLookupId());
- assert src != null;
- }
+ assert src != null;
+ }
- List<GridH2RowRange> ranges = new ArrayList<>();
+ List<GridH2RowRange> ranges = new ArrayList<>();
- int maxRows = qctx.pageSize();
+ int maxRows = qctx.pageSize();
- while (maxRows > 0) {
- GridH2RowRange range = src.next(maxRows);
+ while (maxRows > 0) {
+ GridH2RowRange range = src.next(maxRows);
- if (range == null)
- break;
+ if (range == null)
+ break;
- maxRows -= range.rows().size();
- }
+ maxRows -= range.rows().size();
+ }
- if (src.hasMoreRows()) {
- // Save source for future fetches.
- if (msg.bounds() != null)
- qctx.putSource(node.id(), msg.batchLookupId(), src);
- }
- else if (msg.bounds() == null) {
- // Drop saved source.
- qctx.putSource(node.id(), msg.batchLookupId(), null);
- }
+ if (src.hasMoreRows()) {
+ // Save source for future fetches.
+ if (msg.bounds() != null)
+ qctx.putSource(node.id(), msg.batchLookupId(), src);
+ }
+ else if (msg.bounds() == null) {
+ // Drop saved source.
+ qctx.putSource(node.id(), msg.batchLookupId(), null);
+ }
- assert !ranges.isEmpty();
+ assert !ranges.isEmpty();
- GridH2IndexRangeResponse res = new GridH2IndexRangeResponse();
+ res.ranges(ranges);
+ res.status(STATUS_OK);
+ }
+ catch (Throwable th) {
+ U.error(log, "Failed to process request: " + msg, th);
- res.originNodeId(msg.originNodeId());
- res.queryId(msg.queryId());
- res.batchLookupId(msg.batchLookupId());
- res.ranges(ranges);
+ res.error(th.getClass() + ": " + th.getMessage());
+ res.status(STATUS_ERROR);
+ }
+ }
- // TODO send res
+ send(singletonList(node), res);
}
/**
@@ -1327,7 +1365,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
assert remainingRanges > 0;
- // TODO send req
+ send(singletonList(node), req);
}
/**
@@ -1367,14 +1405,14 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
if (req.bounds() != null)
req = createRequest(qctx, req.batchLookupId());
- // TODO request next by sending req
+ send(singletonList(node), req);
}
else
req = null;
return res;
- case GridH2IndexRangeResponse.STATUS_NOT_FOUND:
+ case STATUS_NOT_FOUND:
if (req == null || req.bounds() == null) // We have already received the first response.
throw new GridH2RetryException("Failure on remote node.");
@@ -1385,7 +1423,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
throw new IgniteInterruptedException(e.getMessage());
}
- // TODO resend req
+ send(singletonList(node), req);
break;
http://git-wip-us.apache.org/repos/asf/ignite/blob/750f146b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 3a8b8cb..134631c 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -159,6 +159,13 @@ public class GridReduceQueryExecutor {
/** */
private final GridSpinBusyLock busyLock;
+ /** */
+ private final CIX2<ClusterNode,Message> locNodeHandler = new CIX2<ClusterNode,Message>() {
+ @Override public void applyx(ClusterNode locNode, Message msg) {
+ h2.mapQueryExecutor().onMessage(locNode.id(), msg);
+ }
+ };
+
/**
* @param busyLock Busy lock.
*/
@@ -1119,11 +1126,7 @@ public class GridReduceQueryExecutor {
Message msg,
@Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize
) {
- return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, new CIX2<ClusterNode,Message>() {
- @Override public void applyx(ClusterNode locNode, Message msg) {
- h2.mapQueryExecutor().onMessage(locNode.id(), msg);
- }
- });
+ return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHandler);
}
/**