You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/05/27 18:39:27 UTC
[1/2] ignite git commit: IGNITE-5311: Added ability to get
CacheObject value without CacheObjectContext. This closes #2019.
Repository: ignite
Updated Branches:
refs/heads/master 858e5b729 -> aad3b0c53
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index a31263f..8d9d953 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.cache.CacheException;
@@ -100,7 +99,6 @@ import org.jsr166.ConcurrentHashMap8;
import static java.util.Collections.singletonList;
import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
-import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
import static org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery.EMPTY_PARAMS;
import static org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode.OFF;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REDUCE;
@@ -132,7 +130,7 @@ public class GridReduceQueryExecutor {
private final AtomicLong qryIdGen;
/** */
- private final ConcurrentMap<Long, QueryRun> runs = new ConcurrentHashMap8<>();
+ private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>();
/** */
private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
@@ -191,8 +189,8 @@ public class GridReduceQueryExecutor {
@Override public void onEvent(final Event evt) {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
- for (QueryRun r : runs.values()) {
- for (GridMergeIndex idx : r.idxs) {
+ for (ReduceQueryRun r : runs.values()) {
+ for (GridMergeIndex idx : r.indexes()) {
if (idx.hasSource(nodeId)) {
handleNodeLeft(r, nodeId);
@@ -208,7 +206,7 @@ public class GridReduceQueryExecutor {
* @param r Query run.
* @param nodeId Left node ID.
*/
- private void handleNodeLeft(QueryRun r, UUID nodeId) {
+ private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) {
// Will attempt to retry. If reduce query was started it will fail on next page fetching.
retry(r, h2.readyTopologyVersion(), nodeId);
}
@@ -248,7 +246,7 @@ public class GridReduceQueryExecutor {
* @param msg Message.
*/
private void onFail(ClusterNode node, GridQueryFailResponse msg) {
- QueryRun r = runs.get(msg.queryRequestId());
+ ReduceQueryRun r = runs.get(msg.queryRequestId());
fail(r, node.id(), msg.error(), msg.failCode());
}
@@ -258,7 +256,7 @@ public class GridReduceQueryExecutor {
* @param nodeId Failed node ID.
* @param msg Error message.
*/
- private void fail(QueryRun r, UUID nodeId, String msg, byte failCode) {
+ private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
if (r != null) {
CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg);
@@ -278,21 +276,21 @@ public class GridReduceQueryExecutor {
final int qry = msg.query();
final int seg = msg.segmentId();
- final QueryRun r = runs.get(qryReqId);
+ final ReduceQueryRun r = runs.get(qryReqId);
if (r == null) // Already finished with error or canceled.
return;
- final int pageSize = r.pageSize;
+ final int pageSize = r.pageSize();
- GridMergeIndex idx = r.idxs.get(msg.query());
+ GridMergeIndex idx = r.indexes().get(msg.query());
GridResultPage page;
try {
page = new GridResultPage(ctx, node.id(), msg) {
@Override public void fetchNextPage() {
- Object errState = r.state.get();
+ Object errState = r.state();
if (errState != null) {
CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
@@ -335,7 +333,7 @@ public class GridReduceQueryExecutor {
if (msg.retry() != null)
retry(r, msg.retry(), node.id());
else if (msg.page() == 0) // Do count down on each first page received.
- r.latch.countDown();
+ r.latch().countDown();
}
/**
@@ -343,7 +341,7 @@ public class GridReduceQueryExecutor {
* @param retryVer Retry version.
* @param nodeId Node ID.
*/
- private void retry(QueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
+ private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
r.state(retryVer, nodeId);
}
@@ -501,7 +499,7 @@ public class GridReduceQueryExecutor {
}
/**
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param keepPortable Keep portable.
* @param enforceJoinOrder Enforce join order of tables.
@@ -512,7 +510,7 @@ public class GridReduceQueryExecutor {
* @return Rows iterator.
*/
public Iterator<List<?>> query(
- GridCacheContext<?, ?> cctx,
+ String schemaName,
GridCacheTwoStepQuery qry,
boolean keepPortable,
boolean enforceJoinOrder,
@@ -541,10 +539,8 @@ public class GridReduceQueryExecutor {
final long qryReqId = qryIdGen.incrementAndGet();
- final String cacheName = cctx.name();
-
- final QueryRun r = new QueryRun(qryReqId, qry.originalSql(), cacheName,
- h2.connectionForCache(cacheName), qry.mapQueries().size(), qry.pageSize(),
+ final ReduceQueryRun r = new ReduceQueryRun(qryReqId, qry.originalSql(), schemaName,
+ h2.connectionForSchema(schemaName), qry.mapQueries().size(), qry.pageSize(),
U.currentTimeMillis(), cancel);
AffinityTopologyVersion topVer = h2.readyTopologyVersion();
@@ -633,7 +629,7 @@ public class GridReduceQueryExecutor {
GridMergeTable tbl;
try {
- tbl = createMergeTable(r.conn, mapQry, qry.explain());
+ tbl = createMergeTable(r.connection(), mapQry, qry.explain());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -641,7 +637,7 @@ public class GridReduceQueryExecutor {
idx = tbl.getMergeIndex();
- fakeTable(r.conn, tblIdx++).innerTable(tbl);
+ fakeTable(r.connection(), tblIdx++).innerTable(tbl);
}
else
idx = GridMergeIndexUnsorted.createDummy(ctx);
@@ -659,13 +655,13 @@ public class GridReduceQueryExecutor {
else
idx.setSources(nodes, segmentsPerIndex);
- idx.setPageSize(r.pageSize);
+ idx.setPageSize(r.pageSize());
- r.idxs.add(idx);
+ r.indexes().add(idx);
}
- r.latch = new CountDownLatch(isReplicatedOnly ? 1 :
- (r.idxs.size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt);
+ r.latch(new CountDownLatch(isReplicatedOnly ? 1 :
+ (r.indexes().size() - replicatedQrysCnt) * nodes.size() * segmentsPerIndex + replicatedQrysCnt));
runs.put(qryReqId, r);
@@ -719,7 +715,7 @@ public class GridReduceQueryExecutor {
new GridH2QueryRequest()
.requestId(qryReqId)
.topologyVersion(topVer)
- .pageSize(r.pageSize)
+ .pageSize(r.pageSize())
.caches(qry.cacheIds())
.tables(distributedJoins ? qry.tables() : null)
.partitions(convert(partsMap))
@@ -731,7 +727,7 @@ public class GridReduceQueryExecutor {
false)) {
awaitAllReplies(r, nodes, cancel);
- Object state = r.state.get();
+ Object state = r.state();
if (state != null) {
if (state instanceof CacheException) {
@@ -764,7 +760,7 @@ public class GridReduceQueryExecutor {
List<List<?>> res = new ArrayList<>();
// Simple UNION ALL can have multiple indexes.
- for (GridMergeIndex idx : r.idxs) {
+ for (GridMergeIndex idx : r.indexes()) {
Cursor cur = idx.findInStream(null, null);
while (cur.next()) {
@@ -788,21 +784,19 @@ public class GridReduceQueryExecutor {
UUID locNodeId = ctx.localNodeId();
- H2Utils.setupConnection(r.conn, false, enforceJoinOrder);
+ H2Utils.setupConnection(r.connection(), false, enforceJoinOrder);
GridH2QueryContext.set(new GridH2QueryContext(locNodeId, locNodeId, qryReqId, REDUCE)
- .pageSize(r.pageSize).distributedJoinMode(OFF));
+ .pageSize(r.pageSize()).distributedJoinMode(OFF));
try {
- String schema = h2.schema(cacheName);
-
if (qry.explain())
- return explainPlan(r.conn, schema, qry, params);
+ return explainPlan(r.connection(), schemaName, qry, params);
GridCacheSqlQuery rdc = qry.reduceQuery();
- ResultSet res = h2.executeSqlQueryWithTimer(schema,
- r.conn,
+ ResultSet res = h2.executeSqlQueryWithTimer(schemaName,
+ r.connection(),
rdc.query(),
F.asList(rdc.parameters(params)),
false, // The statement will cache some extra thread local objects.
@@ -824,10 +818,10 @@ public class GridReduceQueryExecutor {
continue;
}
- return new GridQueryCacheObjectsIterator(resIter, cctx, keepPortable);
+ return new GridQueryCacheObjectsIterator(resIter, h2.valueContext(), keepPortable);
}
catch (IgniteCheckedException | RuntimeException e) {
- U.closeQuiet(r.conn);
+ U.closeQuiet(r.connection());
if (e instanceof CacheException) {
if (wasCancelled((CacheException)e))
@@ -898,7 +892,7 @@ public class GridReduceQueryExecutor {
* @param distributedJoins Distributed join flag.
*/
private void cancelRemoteQueriesIfNeeded(Collection<ClusterNode> nodes,
- QueryRun r,
+ ReduceQueryRun r,
long qryReqId,
boolean distributedJoins)
{
@@ -906,7 +900,7 @@ public class GridReduceQueryExecutor {
if (distributedJoins)
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
else {
- for (GridMergeIndex idx : r.idxs) {
+ for (GridMergeIndex idx : r.indexes()) {
if (!idx.fetchedAll()) {
send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
@@ -922,9 +916,9 @@ public class GridReduceQueryExecutor {
* @param cancel Query cancel.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
- private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
+ private void awaitAllReplies(ReduceQueryRun r, Collection<ClusterNode> nodes, GridQueryCancel cancel)
throws IgniteInterruptedCheckedException, QueryCancelledException {
- while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+ while (!U.await(r.latch(), 500, TimeUnit.MILLISECONDS)) {
cancel.checkCancelled();
@@ -932,7 +926,7 @@ public class GridReduceQueryExecutor {
if (!ctx.discovery().alive(node)) {
handleNodeLeft(r, node.id());
- assert r.latch.getCount() == 0;
+ assert r.latch().getCount() == 0;
return;
}
@@ -1420,7 +1414,7 @@ public class GridReduceQueryExecutor {
CacheException err = new CacheException("Query was cancelled, client node disconnected.",
new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."));
- for (Map.Entry<Long, QueryRun> e : runs.entrySet())
+ for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
e.getValue().disconnected(err);
}
@@ -1435,9 +1429,9 @@ public class GridReduceQueryExecutor {
long curTime = U.currentTimeMillis();
- for (QueryRun run : runs.values()) {
- if (run.qry.longQuery(curTime, duration))
- res.add(run.qry);
+ for (ReduceQueryRun run : runs.values()) {
+ if (run.queryInfo().longQuery(curTime, duration))
+ res.add(run.queryInfo());
}
return res;
@@ -1450,77 +1444,10 @@ public class GridReduceQueryExecutor {
*/
public void cancelQueries(Collection<Long> queries) {
for (Long qryId : queries) {
- QueryRun run = runs.get(qryId);
+ ReduceQueryRun run = runs.get(qryId);
if (run != null)
- run.qry.cancel();
- }
- }
-
- /**
- * Query run.
- */
- private static class QueryRun {
- /** */
- private final GridRunningQueryInfo qry;
-
- /** */
- private final List<GridMergeIndex> idxs;
-
- /** */
- private CountDownLatch latch;
-
- /** */
- private final JdbcConnection conn;
-
- /** */
- private final int pageSize;
-
- /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
- private final AtomicReference<Object> state = new AtomicReference<>();
-
- /**
- * @param id Query ID.
- * @param qry Query text.
- * @param cache Cache where query was executed.
- * @param conn Connection.
- * @param idxsCnt Number of indexes.
- * @param pageSize Page size.
- * @param startTime Start time.
- * @param cancel Query cancel handler.
- */
- private QueryRun(Long id, String qry, String cache, Connection conn, int idxsCnt, int pageSize, long startTime, GridQueryCancel cancel) {
- this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, cache, startTime, cancel, false);
- this.conn = (JdbcConnection)conn;
- this.idxs = new ArrayList<>(idxsCnt);
- this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
- }
-
- /**
- * @param o Fail state object.
- * @param nodeId Node ID.
- */
- void state(Object o, @Nullable UUID nodeId) {
- assert o != null;
- assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
-
- if (!state.compareAndSet(null, o))
- return;
-
- while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
- latch.countDown();
-
- CacheException e = o instanceof CacheException ? (CacheException) o : null;
-
- for (GridMergeIndex idx : idxs) // Fail all merge indexes.
- idx.fail(nodeId, e);
- }
-
- /**
- * @param e Error.
- */
- void disconnected(CacheException e) {
- state(e, null);
+ run.queryInfo().cancel();
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
new file mode 100644
index 0000000..73bb002
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.h2.jdbc.JdbcConnection;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.CacheException;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.SQL_FIELDS;
+
+/**
+ * Query run.
+ */
+class ReduceQueryRun {
+ /** */
+ private final GridRunningQueryInfo qry;
+
+ /** */
+ private final List<GridMergeIndex> idxs;
+
+ /** */
+ private CountDownLatch latch;
+
+ /** */
+ private final JdbcConnection conn;
+
+ /** */
+ private final int pageSize;
+
+ /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
+ private final AtomicReference<Object> state = new AtomicReference<>();
+
+ /**
+ * Constructor.
+ *
+ * @param id Query ID.
+ * @param qry Query text.
+ * @param schemaName Schema name.
+ * @param conn Connection.
+ * @param idxsCnt Number of indexes.
+ * @param pageSize Page size.
+ * @param startTime Start time.
+ * @param cancel Query cancel handler.
+ */
+ ReduceQueryRun(Long id, String qry, String schemaName, Connection conn, int idxsCnt, int pageSize, long startTime,
+ GridQueryCancel cancel) {
+ this.qry = new GridRunningQueryInfo(id, qry, SQL_FIELDS, schemaName, startTime, cancel, false);
+
+ this.conn = (JdbcConnection)conn;
+
+ this.idxs = new ArrayList<>(idxsCnt);
+
+ this.pageSize = pageSize > 0 ? pageSize : GridCacheTwoStepQuery.DFLT_PAGE_SIZE;
+ }
+
+ /**
+ * @param o Fail state object.
+ * @param nodeId Node ID.
+ */
+ void state(Object o, @Nullable UUID nodeId) {
+ assert o != null;
+ assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
+
+ if (!state.compareAndSet(null, o))
+ return;
+
+ while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
+ latch.countDown();
+
+ CacheException e = o instanceof CacheException ? (CacheException) o : null;
+
+ for (GridMergeIndex idx : idxs) // Fail all merge indexes.
+ idx.fail(nodeId, e);
+ }
+
+ /**
+ * @param e Error.
+ */
+ void disconnected(CacheException e) {
+ state(e, null);
+ }
+
+ /**
+ * @return Query info.
+ */
+ GridRunningQueryInfo queryInfo() {
+ return qry;
+ }
+
+ /**
+ * @return Page size.
+ */
+ int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @return Connection.
+ */
+ JdbcConnection connection() {
+ return conn;
+ }
+
+ /**
+ * @return State.
+ */
+ Object state() {
+ return state.get();
+ }
+
+ /**
+ * @return Indexes.
+ */
+ List<GridMergeIndex> indexes() {
+ return idxs;
+ }
+
+ /**
+ * @return Latch.
+ */
+ CountDownLatch latch() {
+ return latch;
+ }
+
+ /**
+ * @param latch Latch.
+ */
+ void latch(CountDownLatch latch) {
+ this.latch = latch;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
index 5ac02a5..1f73dcb 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/GridIndexingSpiAbstractSelfTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
import org.apache.ignite.internal.processors.query.GridQueryIndexDescriptor;
@@ -726,7 +727,7 @@ public abstract class GridIndexingSpiAbstractSelfTest extends GridCommonAbstract
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return (T)val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
index 5939b59..b66a343 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/sql/GridQueryParsingTest.java
@@ -870,7 +870,9 @@ public class GridQueryParsingTest extends GridCommonAbstractTest {
IgniteH2Indexing idx = U.field(qryProcessor, "idx");
- return (JdbcConnection)idx.connectionForCache(DEFAULT_CACHE_NAME);
+ String schemaName = idx.schema(DEFAULT_CACHE_NAME);
+
+ return (JdbcConnection)idx.connectionForSchema(schemaName);
}
/**
[2/2] ignite git commit: IGNITE-5311: Added ability to get
CacheObject value without CacheObjectContext. This closes #2019.
Posted by vo...@apache.org.
IGNITE-5311: Added ability to get CacheObject value without CacheObjectContext. This closes #2019.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aad3b0c5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aad3b0c5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aad3b0c5
Branch: refs/heads/master
Commit: aad3b0c536e3f7b0836d31daa63cd6d6137675d5
Parents: 858e5b7
Author: devozerov <vo...@gridgain.com>
Authored: Sat May 27 21:39:08 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Sat May 27 21:39:08 2017 +0300
----------------------------------------------------------------------
.../internal/binary/BinaryEnumObjectImpl.java | 5 +-
.../internal/binary/BinaryObjectImpl.java | 18 +-
.../binary/BinaryObjectOffheapImpl.java | 3 +-
.../internal/processors/cache/CacheObject.java | 2 +-
.../processors/cache/CacheObjectAdapter.java | 4 +-
.../cache/CacheObjectByteArrayImpl.java | 2 +-
.../processors/cache/CacheObjectContext.java | 197 ++-----------------
.../processors/cache/CacheObjectImpl.java | 25 ++-
.../processors/cache/CacheObjectUtils.java | 173 ++++++++++++++++
.../cache/CacheObjectValueContext.java | 50 +++++
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheEventManager.java | 2 +-
.../processors/cache/GridCacheMapEntry.java | 4 +-
.../processors/cache/KeyCacheObjectImpl.java | 4 +-
.../cache/binary/CacheObjectBinaryContext.java | 6 +-
.../binary/CacheObjectBinaryProcessorImpl.java | 5 +-
.../cache/database/CacheDataRowAdapter.java | 8 +-
.../cache/distributed/near/GridNearTxLocal.java | 3 +-
.../cache/transactions/TxDeadlock.java | 7 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 6 +-
.../IgniteCacheObjectProcessorImpl.java | 39 ++--
.../query/CacheQueryObjectValueContext.java | 64 ++++++
.../query/GridQueryCacheObjectsIterator.java | 16 +-
.../processors/query/GridQueryProcessor.java | 14 +-
.../processors/query/GridRunningQueryInfo.java | 16 +-
.../query/VisorRunningQueriesCollectorTask.java | 2 +-
.../internal/GridAffinityNoCacheSelfTest.java | 3 +-
.../IgniteIncompleteCacheObjectSelfTest.java | 2 +-
.../database/FreeListImplSelfTest.java | 3 +-
.../query/h2/DmlStatementsProcessor.java | 4 +-
.../processors/query/h2/IgniteH2Indexing.java | 44 +++--
.../query/h2/opt/GridLuceneIndex.java | 2 +-
.../query/h2/twostep/GridMapQueryExecutor.java | 4 +-
.../h2/twostep/GridReduceQueryExecutor.java | 159 ++++-----------
.../query/h2/twostep/ReduceQueryRun.java | 157 +++++++++++++++
.../h2/GridIndexingSpiAbstractSelfTest.java | 3 +-
.../query/h2/sql/GridQueryParsingTest.java | 4 +-
37 files changed, 655 insertions(+), 409 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
index f889e45..6a1ad6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
@@ -280,7 +281,7 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return deserialize();
}
@@ -335,7 +336,7 @@ public class BinaryEnumObjectImpl implements BinaryObjectEx, Externalizable, Cac
/** {@inheritDoc} */
@Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.processor()).binaryContext();
+ this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects()).binaryContext();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 16e5ccd..d0d0699 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectAdapter;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -135,7 +136,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
Object obj0 = obj;
if (obj0 == null || (cpy && needCopy(ctx)))
@@ -188,7 +189,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
/** {@inheritDoc} */
@Override public void finishUnmarshal(CacheObjectContext ctx, ClassLoader ldr) throws IgniteCheckedException {
- this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.processor()).binaryContext();
+ this.ctx = ((CacheObjectBinaryProcessorImpl)ctx.kernalContext().cacheObjects()).binaryContext();
}
/** {@inheritDoc} */
@@ -787,9 +788,9 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
* @param coCtx CacheObjectContext.
* @return Object.
*/
- private Object deserializeValue(@Nullable CacheObjectContext coCtx) {
- BinaryReaderExImpl reader = reader(null,
- coCtx != null ? coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader(), true);
+ private Object deserializeValue(@Nullable CacheObjectValueContext coCtx) {
+ BinaryReaderExImpl reader = reader(null, coCtx != null ?
+ coCtx.kernalContext().config().getClassLoader() : ctx.configuration().getClassLoader(), true);
Object obj0 = reader.deserialize();
@@ -807,8 +808,8 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
* @param ctx Context.
* @return {@code True} need to copy value returned to user.
*/
- private boolean needCopy(CacheObjectContext ctx) {
- return ctx.copyOnGet() && obj != null && !ctx.processor().immutable(obj);
+ private boolean needCopy(CacheObjectValueContext ctx) {
+ return ctx.copyOnGet() && obj != null && !ctx.kernalContext().cacheObjects().immutable(obj);
}
/**
@@ -819,7 +820,8 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
* @param forUnmarshal {@code True} if reader is need to unmarshal object.
* @return Reader.
*/
- private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr, boolean forUnmarshal) {
+ private BinaryReaderExImpl reader(@Nullable BinaryReaderHandles rCtx, @Nullable ClassLoader ldr,
+ boolean forUnmarshal) {
if (ldr == null)
ldr = ctx.configuration().getClassLoader();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index bdf0ce1..0a0a7b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -444,7 +445,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return (T)deserializeValue();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index c226ba2..8faaa03 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -43,7 +43,7 @@ public interface CacheObject extends Message {
* @param cpy If {@code true} need to copy value.
* @return Value.
*/
- @Nullable public <T> T value(CacheObjectContext ctx, boolean cpy);
+ @Nullable public <T> T value(CacheObjectValueContext ctx, boolean cpy);
/**
* @param ctx Context.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index 6af38ac..e2a15ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -50,8 +50,8 @@ public abstract class CacheObjectAdapter implements CacheObject, Externalizable
* @param ctx Context.
* @return {@code True} need to copy value returned to user.
*/
- protected boolean needCopy(CacheObjectContext ctx) {
- return ctx.copyOnGet() && val != null && !ctx.processor().immutable(val);
+ protected boolean needCopy(CacheObjectValueContext ctx) {
+ return ctx.copyOnGet() && val != null && !ctx.kernalContext().cacheObjects().immutable(val);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
index fe284ae..6a13f8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
@@ -62,7 +62,7 @@ public class CacheObjectByteArrayImpl implements CacheObject, Externalizable {
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
if (cpy)
return (T)Arrays.copyOf(val, val.length);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
index a777ab6..655a3e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectContext.java
@@ -27,21 +27,17 @@ import java.util.Map;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryUtils;
-import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.util.typedef.F;
/**
*
*/
@SuppressWarnings("TypeMayBeWeakened")
-public class CacheObjectContext {
+public class CacheObjectContext implements CacheObjectValueContext {
/** */
private GridKernalContext kernalCtx;
/** */
- private IgniteCacheObjectProcessor proc;
-
- /** */
private String cacheName;
/** */
@@ -54,9 +50,6 @@ public class CacheObjectContext {
private boolean storeVal;
/** */
- private boolean p2pEnabled;
-
- /** */
private boolean addDepInfo;
/**
@@ -78,9 +71,6 @@ public class CacheObjectContext {
this.cpyOnGet = cpyOnGet;
this.storeVal = storeVal;
this.addDepInfo = addDepInfo;
-
- p2pEnabled = kernalCtx.config().isPeerClassLoadingEnabled();
- proc = kernalCtx.cacheObjects();
}
/**
@@ -90,31 +80,18 @@ public class CacheObjectContext {
return cacheName;
}
- /**
- * @return {@code True} if peer class loading is enabled.
- */
- public boolean p2pEnabled() {
- return p2pEnabled;
- }
-
- /**
- * @return {@code True} if deployment info should be associated with the objects of this cache.
- */
- public boolean addDeploymentInfo() {
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
return addDepInfo;
}
- /**
- * @return Copy on get flag.
- */
- public boolean copyOnGet() {
+ /** {@inheritDoc} */
+ @Override public boolean copyOnGet() {
return cpyOnGet;
}
- /**
- * @return {@code True} if should store unmarshalled value in cache.
- */
- public boolean storeValue() {
+ /** {@inheritDoc} */
+ @Override public boolean storeValue() {
return storeVal;
}
@@ -125,27 +102,14 @@ public class CacheObjectContext {
return dfltAffMapper;
}
- /**
- * @return Kernal context.
- */
- public GridKernalContext kernalContext() {
+ /** {@inheritDoc} */
+ @Override public GridKernalContext kernalContext() {
return kernalCtx;
}
- /**
- * @return Processor.
- */
- public IgniteCacheObjectProcessor processor() {
- return proc;
- }
-
- /**
- * @param o Object to unwrap.
- * @param keepBinary Keep binary flag.
- * @return Unwrapped object.
- */
- public Object unwrapBinaryIfNeeded(Object o, boolean keepBinary) {
- return unwrapBinaryIfNeeded(o, keepBinary, true);
+ /** {@inheritDoc} */
+ @Override public boolean binaryEnabled() {
+ return false;
}
/**
@@ -158,141 +122,6 @@ public class CacheObjectContext {
if (o == null)
return null;
- return unwrapBinary(o, keepBinary, cpy);
- }
-
- /**
- * @param col Collection of objects to unwrap.
- * @param keepBinary Keep binary flag.
- * @return Unwrapped collection.
- */
- public Collection<Object> unwrapBinariesIfNeeded(Collection<Object> col, boolean keepBinary) {
- return unwrapBinariesIfNeeded(col, keepBinary, true);
- }
-
- /**
- * @param col Collection to unwrap.
- * @param keepBinary Keep binary flag.
- * @param cpy Copy value flag.
- * @return Unwrapped collection.
- */
- public Collection<Object> unwrapBinariesIfNeeded(Collection<Object> col, boolean keepBinary, boolean cpy) {
- Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
-
- if (col0 == null)
- col0 = new ArrayList<>(col.size());
-
- for (Object obj : col)
- col0.add(unwrapBinary(obj, keepBinary, cpy));
-
- return col0;
- }
-
- /**
- * @param col Collection to unwrap.
- * @param keepBinary Keep binary flag.
- * @param cpy Copy flag.
- * @return Unwrapped collection.
- */
- private Collection<Object> unwrapKnownCollection(Collection<Object> col, boolean keepBinary, boolean cpy) {
- Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
-
- for (Object obj : col)
- col0.add(unwrapBinary(obj, keepBinary, cpy));
-
- return col0;
- }
-
- /**
- * Unwrap array of binaries if needed.
- *
- * @param arr Array.
- * @param keepBinary Keep binary flag.
- * @param cpy Copy.
- * @return Result.
- */
- public Object[] unwrapBinariesInArrayIfNeeded(Object[] arr, boolean keepBinary, boolean cpy) {
- if (BinaryUtils.knownArray(arr))
- return arr;
-
- Object[] res = new Object[arr.length];
-
- for (int i = 0; i < arr.length; i++)
- res[i] = unwrapBinary(arr[i], keepBinary, cpy);
-
- return res;
- }
-
- /**
- * Unwraps map.
- *
- * @param map Map to unwrap.
- * @param keepBinary Keep binary flag.
- * @return Unwrapped collection.
- */
- private Map<Object, Object> unwrapBinariesIfNeeded(Map<Object, Object> map, boolean keepBinary, boolean cpy) {
- if (keepBinary)
- return map;
-
- Map<Object, Object> map0 = BinaryUtils.newMap(map);
-
- for (Map.Entry<Object, Object> e : map.entrySet())
- map0.put(unwrapBinary(e.getKey(), keepBinary, cpy), unwrapBinary(e.getValue(), keepBinary, cpy));
-
- return map0;
- }
-
- /**
- * @param o Object to unwrap.
- * @return Unwrapped object.
- */
- private Object unwrapBinary(Object o, boolean keepBinary, boolean cpy) {
- if (o instanceof Map.Entry) {
- Map.Entry entry = (Map.Entry)o;
-
- Object key = entry.getKey();
-
- Object uKey = unwrapBinary(key, keepBinary, cpy);
-
- Object val = entry.getValue();
-
- Object uVal = unwrapBinary(val, keepBinary, cpy);
-
- return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
- }
- else if (BinaryUtils.knownCollection(o))
- return unwrapKnownCollection((Collection<Object>)o, keepBinary, cpy);
- else if (BinaryUtils.knownMap(o))
- return unwrapBinariesIfNeeded((Map<Object, Object>)o, keepBinary, cpy);
- else if (o instanceof Object[])
- return unwrapBinariesInArrayIfNeeded((Object[])o, keepBinary, cpy);
- else if (o instanceof CacheObject) {
- CacheObject co = (CacheObject)o;
-
- if (!keepBinary || co.isPlatformType())
- return unwrapBinary(co.value(this, cpy), keepBinary, cpy);
- }
-
- return o;
- }
-
- /**
- * @param o Object to test.
- * @return True if collection should be recursively unwrapped.
- */
- private boolean knownCollection(Object o) {
- Class<?> cls = o == null ? null : o.getClass();
-
- return cls == ArrayList.class || cls == LinkedList.class || cls == HashSet.class;
- }
-
- /**
- * @param o Object to test.
- * @return True if map should be recursively unwrapped.
- */
- private boolean knownMap(Object o) {
- Class<?> cls = o == null ? null : o.getClass();
-
- return cls == HashMap.class || cls == LinkedHashMap.class;
+ return CacheObjectUtils.unwrapBinaryIfNeeded(this, o, keepBinary, cpy);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
index 7fe4297..76f354a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.jetbrains.annotations.Nullable;
/**
@@ -53,27 +55,31 @@ public class CacheObjectImpl extends CacheObjectAdapter {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
cpy = cpy && needCopy(ctx);
try {
+ GridKernalContext kernalCtx = ctx.kernalContext();
+
+ IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
if (cpy) {
if (valBytes == null) {
assert val != null;
- valBytes = ctx.processor().marshal(ctx, val);
+ valBytes = proc.marshal(ctx, val);
}
ClassLoader clsLdr;
if (val != null)
clsLdr = val.getClass().getClassLoader();
- else if (ctx.kernalContext().config().isPeerClassLoadingEnabled())
- clsLdr = ctx.kernalContext().cache().context().deploy().globalLoader();
+ else if (kernalCtx.config().isPeerClassLoadingEnabled())
+ clsLdr = kernalCtx.cache().context().deploy().globalLoader();
else
clsLdr = null;
- return (T)ctx.processor().unmarshal(ctx, valBytes, clsLdr);
+ return (T)proc.unmarshal(ctx, valBytes, clsLdr);
}
if (val != null)
@@ -81,9 +87,8 @@ public class CacheObjectImpl extends CacheObjectAdapter {
assert valBytes != null;
- Object val = ctx.processor().unmarshal(ctx, valBytes,
- ctx.kernalContext().config().isPeerClassLoadingEnabled() ?
- ctx.kernalContext().cache().context().deploy().globalLoader() : null);
+ Object val = proc.unmarshal(ctx, valBytes, kernalCtx.config().isPeerClassLoadingEnabled() ?
+ kernalCtx.cache().context().deploy().globalLoader() : null);
if (ctx.storeValue())
this.val = val;
@@ -98,7 +103,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
/** {@inheritDoc} */
@Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException {
if (valBytes == null)
- valBytes = ctx.processor().marshal(ctx, val);
+ valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
return valBytes;
}
@@ -116,7 +121,7 @@ public class CacheObjectImpl extends CacheObjectAdapter {
assert val != null || valBytes != null;
if (val == null && ctx.storeValue())
- val = ctx.processor().unmarshal(ctx, valBytes, ldr);
+ val = ctx.kernalContext().cacheObjects().unmarshal(ctx, valBytes, ldr);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
new file mode 100644
index 0000000..f9c76df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectUtils.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.util.typedef.F;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Cache object utility methods.
+ */
+public class CacheObjectUtils {
+ /**
+ * @param o Object to unwrap.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy value flag.
+ * @return Unwrapped object.
+ */
+ public static Object unwrapBinaryIfNeeded(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
+ if (o == null)
+ return null;
+
+ return unwrapBinary(ctx, o, keepBinary, cpy);
+ }
+
+ /**
+ * @param col Collection of objects to unwrap.
+ * @param keepBinary Keep binary flag.
+ * @return Unwrapped collection.
+ */
+ public static Collection<Object> unwrapBinariesIfNeeded(CacheObjectValueContext ctx, Collection<Object> col,
+ boolean keepBinary) {
+ return unwrapBinariesIfNeeded(ctx, col, keepBinary, true);
+ }
+
+ /**
+ * @param col Collection to unwrap.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy flag.
+ * @return Unwrapped collection.
+ */
+ private static Collection<Object> unwrapKnownCollection(CacheObjectValueContext ctx, Collection<Object> col,
+ boolean keepBinary, boolean cpy) {
+ Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
+
+ assert col0 != null;
+
+ for (Object obj : col)
+ col0.add(unwrapBinary(ctx, obj, keepBinary, cpy));
+
+ return col0;
+ }
+
+ /**
+ * Unwraps map.
+ *
+ * @param map Map to unwrap.
+ * @param keepBinary Keep binary flag.
+ * @return Unwrapped collection.
+ */
+ private static Map<Object, Object> unwrapBinariesIfNeeded(CacheObjectValueContext ctx, Map<Object, Object> map,
+ boolean keepBinary, boolean cpy) {
+ if (keepBinary)
+ return map;
+
+ Map<Object, Object> map0 = BinaryUtils.newMap(map);
+
+ for (Map.Entry<Object, Object> e : map.entrySet())
+ map0.put(unwrapBinary(ctx, e.getKey(), false, cpy), unwrapBinary(ctx, e.getValue(), false, cpy));
+
+ return map0;
+ }
+
+ /**
+ * @param col Collection to unwrap.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy value flag.
+ * @return Unwrapped collection.
+ */
+ private static Collection<Object> unwrapBinariesIfNeeded(CacheObjectValueContext ctx, Collection<Object> col,
+ boolean keepBinary, boolean cpy) {
+ Collection<Object> col0 = BinaryUtils.newKnownCollection(col);
+
+ if (col0 == null)
+ col0 = new ArrayList<>(col.size());
+
+ for (Object obj : col)
+ col0.add(unwrapBinary(ctx, obj, keepBinary, cpy));
+
+ return col0;
+ }
+
+ /**
+ * Unwrap array of binaries if needed.
+ *
+ * @param arr Array.
+ * @param keepBinary Keep binary flag.
+ * @param cpy Copy.
+ * @return Result.
+ */
+ private static Object[] unwrapBinariesInArrayIfNeeded(CacheObjectValueContext ctx, Object[] arr, boolean keepBinary,
+ boolean cpy) {
+ if (BinaryUtils.knownArray(arr))
+ return arr;
+
+ Object[] res = new Object[arr.length];
+
+ for (int i = 0; i < arr.length; i++)
+ res[i] = unwrapBinary(ctx, arr[i], keepBinary, cpy);
+
+ return res;
+ }
+
+ /**
+ * @param o Object to unwrap.
+ * @return Unwrapped object.
+ */
+ @SuppressWarnings("unchecked")
+ private static Object unwrapBinary(CacheObjectValueContext ctx, Object o, boolean keepBinary, boolean cpy) {
+ if (o instanceof Map.Entry) {
+ Map.Entry entry = (Map.Entry)o;
+
+ Object key = entry.getKey();
+
+ Object uKey = unwrapBinary(ctx, key, keepBinary, cpy);
+
+ Object val = entry.getValue();
+
+ Object uVal = unwrapBinary(ctx, val, keepBinary, cpy);
+
+ return (key != uKey || val != uVal) ? F.t(uKey, uVal) : o;
+ }
+ else if (BinaryUtils.knownCollection(o))
+ return unwrapKnownCollection(ctx, (Collection<Object>)o, keepBinary, cpy);
+ else if (BinaryUtils.knownMap(o))
+ return unwrapBinariesIfNeeded(ctx, (Map<Object, Object>)o, keepBinary, cpy);
+ else if (o instanceof Object[])
+ return unwrapBinariesInArrayIfNeeded(ctx, (Object[])o, keepBinary, cpy);
+ else if (o instanceof CacheObject) {
+ CacheObject co = (CacheObject)o;
+
+ if (!keepBinary || co.isPlatformType())
+ return unwrapBinary(ctx, co.value(ctx, cpy), keepBinary, cpy);
+ }
+
+ return o;
+ }
+
+ /**
+ * Private constructor.
+ */
+ private CacheObjectUtils() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java
new file mode 100644
index 0000000..49b2873
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectValueContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.GridKernalContext;
+
+/**
+ * Context to get value of cache object.
+ */
+public interface CacheObjectValueContext {
+ /**
+ * @return Kernal context.
+ */
+ public GridKernalContext kernalContext();
+
+ /**
+ * @return Copy on get flag.
+ */
+ public boolean copyOnGet();
+
+ /**
+ * @return {@code True} if should store unmarshalled value in cache.
+ */
+ public boolean storeValue();
+
+ /**
+ * @return {@code True} if deployment info should be associated with the objects of this cache.
+ */
+ public boolean addDeploymentInfo();
+
+ /**
+ * @return Binary enabled flag.
+ */
+ public boolean binaryEnabled();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index a3e70dd..e637122 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -1713,7 +1713,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Unwrapped collection.
*/
public Collection<Object> unwrapBinariesIfNeeded(Collection<Object> col, boolean keepBinary) {
- return cacheObjCtx.unwrapBinariesIfNeeded(col, keepBinary);
+ return CacheObjectUtils.unwrapBinariesIfNeeded(cacheObjCtx, col, keepBinary);
}
/**
@@ -1724,7 +1724,7 @@ public class GridCacheContext<K, V> implements Externalizable {
* @return Unwrapped object.
*/
public Object unwrapBinaryIfNeeded(Object o, boolean keepBinary) {
- return cacheObjCtx.unwrapBinaryIfNeeded(o, keepBinary);
+ return unwrapBinaryIfNeeded(o, keepBinary, true);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 687b132..93c5950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -309,7 +309,7 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
oldVal0 = cctx.cacheObjectContext().unwrapBinaryIfNeeded(oldVal, keepBinary, false);
}
catch (Exception e) {
- if (!cctx.cacheObjectContext().processor().isBinaryEnabled(cctx.config()))
+ if (!cctx.cacheObjectContext().kernalContext().cacheObjects().isBinaryEnabled(cctx.config()))
throw e;
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 15e4469..4f87658 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3840,13 +3840,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
/** {@inheritDoc} */
@Override public K getKey() {
- return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary);
+ return (K)cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, true);
}
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public V getValue() {
- return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(peekVisibleValue(), keepBinary);
+ return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(peekVisibleValue(), keepBinary, true);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index b9efab1..c50672b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -75,7 +75,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
/** {@inheritDoc} */
@Override public byte[] valueBytes(CacheObjectContext ctx) throws IgniteCheckedException {
if (valBytes == null)
- valBytes = ctx.processor().marshal(ctx, val);
+ valBytes = ctx.kernalContext().cacheObjects().marshal(ctx, val);
return valBytes;
}
@@ -94,7 +94,7 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
assert val != null;
return (T)val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
index 26c713c..3b3cf67 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryContext.java
@@ -53,10 +53,8 @@ public class CacheObjectBinaryContext extends CacheObjectContext {
this.binaryEnabled = binaryEnabled;
}
- /**
- * @return Binary enabled flag.
- */
- public boolean binaryEnabled() {
+ /** {@inheritDoc} */
+ @Override public boolean binaryEnabled() {
return binaryEnabled;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 14947e9..5567809 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -735,7 +736,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
/** {@inheritDoc} */
- @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+ @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException {
if (!((CacheObjectBinaryContext)ctx).binaryEnabled() || binaryMarsh == null)
return super.marshal(ctx, val);
@@ -747,7 +748,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
/** {@inheritDoc} */
- @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+ @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
throws IgniteCheckedException {
if (!((CacheObjectBinaryContext)ctx).binaryEnabled() || binaryMarsh == null)
return super.unmarshal(ctx, bytes, clsLdr);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index afeada5..955ca69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -283,7 +283,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
byte[] bytes = PageUtils.getBytes(addr, off, len);
off += len;
- key = coctx.processor().toKeyCacheObject(coctx, type, bytes);
+ key = coctx.kernalContext().cacheObjects().toKeyCacheObject(coctx, type, bytes);
if (rowData == RowData.KEY_ONLY)
return;
@@ -300,7 +300,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
byte[] bytes = PageUtils.getBytes(addr, off, len);
off += len;
- val = coctx.processor().toCacheObject(coctx, type, bytes);
+ val = coctx.kernalContext().cacheObjects().toCacheObject(coctx, type, bytes);
ver = CacheVersionIO.read(addr + off, false);
@@ -359,7 +359,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
ByteBuffer buf,
IncompleteCacheObject incomplete
) throws IgniteCheckedException {
- incomplete = coctx.processor().toKeyCacheObject(coctx, buf, incomplete);
+ incomplete = coctx.kernalContext().cacheObjects().toKeyCacheObject(coctx, buf, incomplete);
if (incomplete.isReady()) {
key = (KeyCacheObject)incomplete.object();
@@ -384,7 +384,7 @@ public class CacheDataRowAdapter implements CacheDataRow {
ByteBuffer buf,
IncompleteCacheObject incomplete
) throws IgniteCheckedException {
- incomplete = coctx.processor().toCacheObject(coctx, buf, incomplete);
+ incomplete = coctx.kernalContext().cacheObjects().toCacheObject(coctx, buf, incomplete);
if (incomplete.isReady()) {
val = incomplete.object();
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 9ad084e..8c10e53 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -1728,7 +1728,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
for (KeyCacheObject cacheKey : lockKeys) {
K keyVal = (K)
(keepCacheObjects ? cacheKey :
- cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary));
+ cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(cacheKey, !deserializeBinary,
+ true));
if (retMap.containsKey(keyVal))
// We already have a return value.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
index 97db698..a613184 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java
@@ -22,6 +22,7 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T2;
@@ -133,15 +134,15 @@ public class TxDeadlock {
IgniteTxKey txKey = e.getKey();
try {
- CacheObjectContext objCtx = ctx.cacheObjectContext(txKey.cacheId());
+ GridCacheContext cctx = ctx.cacheContext(txKey.cacheId());
- Object val = txKey.key().value(objCtx, true);
+ Object val = txKey.key().value(cctx.cacheObjectContext(), true);
sb.append(e.getValue())
.append(" [key=")
.append(val)
.append(", cache=")
- .append(objCtx.cacheName())
+ .append(cctx.name())
.append("]\n");
}
catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 9beb296..ee2d1f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessor;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -116,7 +117,7 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
* @return Value bytes.
* @throws IgniteCheckedException If failed.
*/
- public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException;
+ public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException;
/**
* @param ctx Context.
@@ -125,7 +126,8 @@ public interface IgniteCacheObjectProcessor extends GridProcessor {
* @return Unmarshalled object.
* @throws IgniteCheckedException If failed.
*/
- public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr) throws IgniteCheckedException;
+ public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
+ throws IgniteCheckedException;
/**
* @param ccfg Cache configuration.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
index a8595fb..de9256c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessorImpl.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
import org.apache.ignite.internal.processors.cache.IncompleteCacheObject;
@@ -55,8 +56,8 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
/** */
private IgniteBinary noOpBinary = new NoOpBinary();
- /**
- *
+ /*
+ * Static initializer
*/
static {
IMMUTABLE_CLS.add(String.class);
@@ -99,12 +100,12 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
- @Override public byte[] marshal(CacheObjectContext ctx, Object val) throws IgniteCheckedException {
+ @Override public byte[] marshal(CacheObjectValueContext ctx, Object val) throws IgniteCheckedException {
return CU.marshal(ctx.kernalContext().cache().context(), ctx.addDeploymentInfo(), val);
}
/** {@inheritDoc} */
- @Override public Object unmarshal(CacheObjectContext ctx, byte[] bytes, ClassLoader clsLdr)
+ @Override public Object unmarshal(CacheObjectValueContext ctx, byte[] bytes, ClassLoader clsLdr)
throws IgniteCheckedException {
return U.unmarshal(ctx.kernalContext(), bytes, U.resolveClassLoader(clsLdr, ctx.kernalContext().config()));
}
@@ -166,7 +167,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
throw new IllegalArgumentException("Byte arrays cannot be used as cache keys.");
case CacheObject.TYPE_REGULAR:
- return new KeyCacheObjectImpl(ctx.processor().unmarshal(ctx, bytes, null), bytes, -1);
+ return new KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, null), bytes, -1);
}
throw new IllegalArgumentException("Invalid object type: " + type);
@@ -392,14 +393,18 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
/** {@inheritDoc} */
@Override public CacheObject prepareForCache(CacheObjectContext ctx) {
try {
- if (!ctx.processor().immutable(val)) {
+ IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
+ if (!proc.immutable(val)) {
if (valBytes == null)
- valBytes = ctx.processor().marshal(ctx, val);
+ valBytes = proc.marshal(ctx, val);
+
+ boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
- ClassLoader ldr = ctx.p2pEnabled() ?
+ ClassLoader ldr = p2pEnabled ?
IgniteUtils.detectClassLoader(IgniteUtils.detectClass(this.val)) : U.gridClassLoader();
- Object val = ctx.processor().unmarshal(ctx, valBytes, ldr);
+ Object val = proc.unmarshal(ctx, valBytes, ldr);
KeyCacheObject key = new KeyCacheObjectImpl(val, valBytes, partition());
@@ -443,22 +448,26 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return super.value(ctx, false); // Do not need copy since user value is not in cache.
}
/** {@inheritDoc} */
@Override public CacheObject prepareForCache(CacheObjectContext ctx) {
try {
+ IgniteCacheObjectProcessor proc = ctx.kernalContext().cacheObjects();
+
if (valBytes == null)
- valBytes = ctx.processor().marshal(ctx, val);
+ valBytes = proc.marshal(ctx, val);
if (ctx.storeValue()) {
- ClassLoader ldr = ctx.p2pEnabled() ?
+ boolean p2pEnabled = ctx.kernalContext().config().isPeerClassLoadingEnabled();
+
+ ClassLoader ldr = p2pEnabled ?
IgniteUtils.detectClass(this.val).getClassLoader() : val.getClass().getClassLoader();
- Object val = this.val != null && ctx.processor().immutable(this.val) ? this.val :
- ctx.processor().unmarshal(ctx, valBytes, ldr);
+ Object val = this.val != null && proc.immutable(this.val) ? this.val :
+ proc.unmarshal(ctx, valBytes, ldr);
return new CacheObjectImpl(val, valBytes);
}
@@ -493,7 +502,7 @@ public class IgniteCacheObjectProcessorImpl extends GridProcessorAdapter impleme
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return super.value(ctx, false); // Do not need copy since user value is not in cache.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java
new file mode 100644
index 0000000..08f71c8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/CacheQueryObjectValueContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+
+/**
+ * Cache object value context for queries.
+ */
+public class CacheQueryObjectValueContext implements CacheObjectValueContext {
+ /** Kernal context. */
+ private final GridKernalContext ctx;
+
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ */
+ public CacheQueryObjectValueContext(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridKernalContext kernalContext() {
+ return ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean copyOnGet() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeValue() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return ctx.config().isPeerClassLoadingEnabled() && !binaryEnabled();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean binaryEnabled() {
+ return ctx.config().getMarshaller() instanceof BinaryMarshaller;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
index b006c75..b0e1562 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryCacheObjectsIterator.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.processors.query;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
+
+import org.apache.ignite.internal.processors.cache.CacheObjectUtils;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -31,19 +33,20 @@ public class GridQueryCacheObjectsIterator implements Iterator<List<?>>, AutoClo
private final Iterator<List<?>> iter;
/** */
- private final GridCacheContext<?,?> cctx;
+ private final CacheObjectValueContext cacheObjValCtx;
/** */
private final boolean keepBinary;
/**
* @param iter Iterator.
- * @param cctx Cache context.
+ * @param cacheObjValCtx Cache object context.
* @param keepBinary Keep binary.
*/
- public GridQueryCacheObjectsIterator(Iterator<List<?>> iter, GridCacheContext<?,?> cctx, boolean keepBinary) {
+ public GridQueryCacheObjectsIterator(Iterator<List<?>> iter, CacheObjectValueContext cacheObjValCtx,
+ boolean keepBinary) {
this.iter = iter;
- this.cctx = cctx;
+ this.cacheObjValCtx = cacheObjValCtx;
this.keepBinary = keepBinary;
}
@@ -61,7 +64,8 @@ public class GridQueryCacheObjectsIterator implements Iterator<List<?>>, AutoClo
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public List<?> next() {
- return (List<?>)cctx.unwrapBinariesIfNeeded((Collection<Object>)iter.next(), keepBinary);
+ return ((List<?>)CacheObjectUtils.unwrapBinariesIfNeeded(
+ cacheObjValCtx, (Collection<Object>)iter.next(), keepBinary));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 76cde17..65151c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -1587,10 +1587,10 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
CacheObjectContext coctx = cacheObjectContext(cacheName);
- QueryTypeDescriptorImpl desc = typeByValue(coctx, key, val, true);
+ QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, true);
if (prevVal != null) {
- QueryTypeDescriptorImpl prevValDesc = typeByValue(coctx, key, prevVal, false);
+ QueryTypeDescriptorImpl prevValDesc = typeByValue(cacheName, coctx, key, prevVal, false);
if (prevValDesc != null && prevValDesc != desc)
idx.remove(cacheName, prevValDesc, key, partId, prevVal, prevVer);
@@ -1607,6 +1607,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
}
/**
+ * @param cacheName Cache name.
* @param coctx Cache context.
* @param key Key.
* @param val Value.
@@ -1615,7 +1616,8 @@ public class GridQueryProcessor extends GridProcessorAdapter {
* @throws IgniteCheckedException If type check failed.
*/
@SuppressWarnings("ConstantConditions")
- @Nullable private QueryTypeDescriptorImpl typeByValue(CacheObjectContext coctx,
+ @Nullable private QueryTypeDescriptorImpl typeByValue(String cacheName,
+ CacheObjectContext coctx,
KeyCacheObject key,
CacheObject val,
boolean checkType)
@@ -1629,12 +1631,12 @@ public class GridQueryProcessor extends GridProcessorAdapter {
if (binaryVal) {
int typeId = ctx.cacheObjects().typeId(val);
- id = new QueryTypeIdKey(coctx.cacheName(), typeId);
+ id = new QueryTypeIdKey(cacheName, typeId);
}
else {
valCls = val.value(coctx, false).getClass();
- id = new QueryTypeIdKey(coctx.cacheName(), valCls);
+ id = new QueryTypeIdKey(cacheName, valCls);
}
QueryTypeDescriptorImpl desc = types.get(id);
@@ -2078,7 +2080,7 @@ public class GridQueryProcessor extends GridProcessorAdapter {
try {
CacheObjectContext coctx = cacheObjectContext(cacheName);
- QueryTypeDescriptorImpl desc = typeByValue(coctx, key, val, false);
+ QueryTypeDescriptorImpl desc = typeByValue(cacheName, coctx, key, val, false);
if (desc == null)
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
index d77c8c0..9e11cdf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridRunningQueryInfo.java
@@ -32,8 +32,8 @@ public class GridRunningQueryInfo {
/** Query type. */
private final GridCacheQueryType qryType;
- /** */
- private final String cache;
+ /** Schema name. */
+ private final String schemaName;
/** */
private final long startTime;
@@ -48,17 +48,17 @@ public class GridRunningQueryInfo {
* @param id Query ID.
* @param qry Query text.
* @param qryType Query type.
- * @param cache Cache where query was executed.
+ * @param schemaName Schema name.
* @param startTime Query start time.
* @param cancel Query cancel.
* @param loc Local query flag.
*/
- public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String cache, long startTime,
+ public GridRunningQueryInfo(Long id, String qry, GridCacheQueryType qryType, String schemaName, long startTime,
GridQueryCancel cancel, boolean loc) {
this.id = id;
this.qry = qry;
this.qryType = qryType;
- this.cache = cache;
+ this.schemaName = schemaName;
this.startTime = startTime;
this.cancel = cancel;
this.loc = loc;
@@ -86,10 +86,10 @@ public class GridRunningQueryInfo {
}
/**
- * @return Cache where query was executed.
+ * @return Schema name.
*/
- public String cache() {
- return cache;
+ public String schemaName() {
+ return schemaName;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
index 8d00dd6..9d1da00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorRunningQueriesCollectorTask.java
@@ -90,7 +90,7 @@ public class VisorRunningQueriesCollectorTask extends VisorMultiNodeTask<VisorRu
long curTime = U.currentTimeMillis();
for (GridRunningQueryInfo qry : queries)
- res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.cache(),
+ res.add(new VisorRunningQuery(qry.id(), qry.query(), qry.queryType(), qry.schemaName(),
qry.startTime(), curTime - qry.startTime(),
qry.cancelable(), qry.local()));
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
index a49e095..e144325 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
@@ -26,6 +26,7 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -235,7 +236,7 @@ public class GridAffinityNoCacheSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
A.notNull(ctx, "ctx");
return (T)val;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
index 5be277a..77b6836 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
@@ -102,7 +102,7 @@ public class IgniteIncompleteCacheObjectSelfTest extends GridCommonAbstractTest
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(final CacheObjectContext ctx, final boolean cpy) {
+ @Nullable @Override public <T> T value(final CacheObjectValueContext ctx, final boolean cpy) {
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
index 5f61bd6..a487218 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/FreeListImplSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.MemoryMetricsImpl;
@@ -463,7 +464,7 @@ public class FreeListImplSelfTest extends GridCommonAbstractTest {
}
/** {@inheritDoc} */
- @Nullable @Override public <T> T value(CacheObjectContext ctx, boolean cpy) {
+ @Nullable @Override public <T> T value(CacheObjectValueContext ctx, boolean cpy) {
return (T)data;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 0474aeb..352fe85 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -282,7 +282,7 @@ public class DmlStatementsProcessor {
QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
- return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+ return new GridQueryCacheObjectsIterator(res.iterator(), idx.valueContext(), cctx.keepBinary());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -372,7 +372,7 @@ public class DmlStatementsProcessor {
cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
- return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+ return new GridQueryCacheObjectsIterator(res.iterator(), idx.valueContext(), cctx.keepBinary());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index c94c215..508c8be 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -78,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.QueryTable;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.query.CacheQueryObjectValueContext;
import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
import org.apache.ignite.internal.processors.query.GridQueryCancel;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
@@ -303,6 +305,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
protected volatile GridKernalContext ctx;
+ /** Cache object value context. */
+ protected CacheQueryObjectValueContext valCtx;
+
/** */
private DmlStatementsProcessor dmlProc;
@@ -339,14 +344,6 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param cacheName Cache name.
- * @return Connection.
- */
- public Connection connectionForCache(String cacheName) {
- return connectionForSchema(schema(cacheName));
- }
-
- /**
* @param schema Schema.
* @return Connection.
*/
@@ -403,7 +400,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public PreparedStatement prepareNativeStatement(String cacheName, String sql) throws SQLException {
- return prepareStatement(connectionForCache(cacheName), sql, true);
+ String schemaName = schema(cacheName);
+
+ return prepareStatement(connectionForSchema(schemaName), sql, true);
}
/** {@inheritDoc} */
@@ -918,7 +917,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** {@inheritDoc} */
@Override public long streamUpdateQuery(String cacheName, String qry,
@Nullable Object[] params, IgniteDataStreamer<?, ?> streamer) throws IgniteCheckedException {
- final Connection conn = connectionForCache(cacheName);
+ String schemaName = schema(cacheName);
+
+ final Connection conn = connectionForSchema(schemaName);
final PreparedStatement stmt;
@@ -1117,7 +1118,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
try {
- return new GridQueryCacheObjectsIterator(res.iterator(), cctx, keepBinary);
+ return new GridQueryCacheObjectsIterator(res.iterator(), valueContext(), keepBinary);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
@@ -1231,7 +1232,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
- * @param cctx Cache context.
+ * @param schemaName Schema name.
* @param qry Query.
* @param keepCacheObj Flag to keep cache object.
* @param enforceJoinOrder Enforce join order of tables.
@@ -1239,7 +1240,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @return Iterable result.
*/
private Iterable<List<?>> runQueryTwoStep(
- final GridCacheContext<?,?> cctx,
+ final String schemaName,
final GridCacheTwoStepQuery qry,
final boolean keepCacheObj,
final boolean enforceJoinOrder,
@@ -1250,7 +1251,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
) {
return new Iterable<List<?>>() {
@Override public Iterator<List<?>> iterator() {
- return rdcQryExec.query(cctx, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
+ return rdcQryExec.query(schemaName, qry, keepCacheObj, enforceJoinOrder, timeoutMillis, cancel, params,
parts);
}
};
@@ -1484,7 +1485,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
cancel = new GridQueryCancel();
QueryCursorImpl<List<?>> cursor = new QueryCursorImpl<>(
- runQueryTwoStep(cctx, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
+ runQueryTwoStep(schemaName, twoStepQry, keepBinary, enforceJoinOrder, qry.getTimeout(), cancel,
qry.getArgs(), qry.getPartitions()), cancel);
cursor.fieldsMeta(meta);
@@ -1900,12 +1901,14 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @throws IgniteCheckedException If failed or {@code -1} if the type is unknown.
*/
long size(String cacheName, String typeName) throws IgniteCheckedException {
+ String schemaName = schema(cacheName);
+
H2TableDescriptor tbl = tableDescriptor(typeName, cacheName);
if (tbl == null)
return -1;
- Connection conn = connectionForCache(cacheName);
+ Connection conn = connectionForSchema(schemaName);
H2Utils.setupConnection(conn, false, false);
@@ -1999,6 +2002,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
else {
this.ctx = ctx;
+ valCtx = new CacheQueryObjectValueContext(ctx);
+
nodeId = ctx.localNodeId();
marshaller = ctx.config().getMarshaller();
@@ -2031,6 +2036,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @return Value context.
+ */
+ public CacheObjectValueContext valueContext() {
+ return valCtx;
+ }
+
+ /**
* @param topic Topic.
* @param topicOrd Topic ordinal for {@link GridTopic}.
* @param nodes Nodes.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
index 48c0cb9..93ebc71 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridLuceneIndex.java
@@ -356,7 +356,7 @@ public class GridLuceneIndex implements AutoCloseable {
if (coctx == null) // For tests.
return (Z)JdbcUtils.deserialize(bytes, null);
- return (Z)coctx.processor().unmarshal(coctx, bytes, ldr);
+ return (Z)coctx.kernalContext().cacheObjects().unmarshal(coctx, bytes, ldr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/aad3b0c5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 6fff8de..b7bdde5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -579,7 +579,9 @@ public class GridMapQueryExecutor {
}
}
- Connection conn = h2.connectionForCache(mainCctx.name());
+ String schemaName = h2.schema(mainCctx.name());
+
+ Connection conn = h2.connectionForSchema(schemaName);
H2Utils.setupConnection(conn, distributedJoinMode != OFF, enforceJoinOrder);