You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:28 UTC
[36/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
index 0000000,79937f9..7c53dba
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareResponse.java
@@@ -1,0 -1,511 +1,505 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Near cache prepare response.
+ */
+ public class GridNearTxPrepareResponse<K, V> extends GridDistributedTxPrepareResponse<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Collection of versions that are pending and less than lock version. */
+ @GridToStringInclude
+ @GridDirectCollection(GridCacheVersion.class)
+ private Collection<GridCacheVersion> pending;
+
+ /** Future ID. */
+ private IgniteUuid futId;
+
+ /** Mini future ID. */
+ private IgniteUuid miniId;
+
+ /** DHT version. */
+ private GridCacheVersion dhtVer;
+
+ /** */
+ @GridToStringInclude
+ @GridDirectCollection(int.class)
+ private Collection<Integer> invalidParts;
+
+ /** Map of owned values to set on near node. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Map<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedVals;
+
+ /** Marshalled owned bytes. */
+ @GridToStringExclude
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> ownedValsBytes;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearTxPrepareResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param xid Xid version.
+ * @param futId Future ID.
+ * @param miniId Mini future ID.
+ * @param dhtVer DHT version.
+ * @param invalidParts Invalid partitions.
+ * @param err Error.
+ */
+ public GridNearTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, GridCacheVersion dhtVer,
+ Collection<Integer> invalidParts, Throwable err) {
+ super(xid, err);
+
+ assert futId != null;
+ assert miniId != null;
+ assert dhtVer != null;
+
+ this.futId = futId;
+ this.miniId = miniId;
+ this.dhtVer = dhtVer;
+ this.invalidParts = invalidParts;
+ }
+
+ /**
+ * Gets pending versions that are less than {@link #version()}.
+ *
+ * @return Pending versions.
+ */
+ public Collection<GridCacheVersion> pending() {
+ return pending == null ? Collections.<GridCacheVersion>emptyList() : pending;
+ }
+
+ /**
+ * Sets pending versions that are less than {@link #version()}.
+ *
+ * @param pending Pending versions.
+ */
+ public void pending(Collection<GridCacheVersion> pending) {
+ this.pending = pending;
+ }
+
+ /**
+ * @return Mini future ID.
+ */
+ public IgniteUuid miniId() {
+ return miniId;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return DHT version.
+ */
+ public GridCacheVersion dhtVersion() {
+ return dhtVer;
+ }
+
+ /**
+ * Adds owned value.
+ *
+ * @param key Key.
+ * @param ver DHT version.
+ * @param val Value.
+ * @param valBytes Value bytes.
+ */
+ public void addOwnedValue(IgniteTxKey<K> key, GridCacheVersion ver, V val, byte[] valBytes) {
+ if (ownedVals == null)
+ ownedVals = new HashMap<>();
+
+ ownedVals.put(key, F.t(ver, val, valBytes));
+ }
+
+ /**
+ * @return Owned values map.
+ */
+ public Map<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> ownedValues() {
+ return ownedVals == null ? Collections.<IgniteTxKey<K>, GridTuple3<GridCacheVersion,V,byte[]>>emptyMap() :
+ Collections.unmodifiableMap(ownedVals);
+ }
+
+ /**
+ * @param key Key.
+ * @return {@code True} if response has owned value for given key.
+ */
+ public boolean hasOwnedValue(IgniteTxKey<K> key) {
+ return ownedVals != null && ownedVals.containsKey(key);
+ }
+
+ /**
+ * @return Invalid partitions.
+ */
+ public Collection<Integer> invalidPartitions() {
+ return invalidParts;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (ownedVals != null && ownedValsBytes == null) {
+ ownedValsBytes = new ArrayList<>(ownedVals.size());
+
+ for (Map.Entry<IgniteTxKey<K>, GridTuple3<GridCacheVersion, V, byte[]>> entry : ownedVals.entrySet()) {
+ GridTuple3<GridCacheVersion, V, byte[]> tup = entry.getValue();
+
+ boolean rawBytes = false;
+
+ byte[] valBytes = tup.get3();
+
+ if (valBytes == null) {
+ if (tup.get2() != null && tup.get2() instanceof byte[]) {
+ rawBytes = true;
+
+ valBytes = (byte[])tup.get2();
+ }
+ else
+ valBytes = ctx.marshaller().marshal(tup.get2());
+ }
+
+ ownedValsBytes.add(ctx.marshaller().marshal(F.t(entry.getKey(), tup.get1(), valBytes, rawBytes)));
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (ownedValsBytes != null && ownedVals == null) {
+ ownedVals = new HashMap<>();
+
+ for (byte[] bytes : ownedValsBytes) {
+ GridTuple4<IgniteTxKey<K>, GridCacheVersion, byte[], Boolean> tup = ctx.marshaller().unmarshal(bytes, ldr);
+
+ V val = tup.get4() ? (V)tup.get3() : ctx.marshaller().<V>unmarshal(tup.get3(), ldr);
+
+ ownedVals.put(tup.get1(), F.t(tup.get2(), val, tup.get4() ? null : tup.get3()));
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridNearTxPrepareResponse _clone = new GridNearTxPrepareResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridNearTxPrepareResponse _clone = (GridNearTxPrepareResponse)_msg;
+
+ _clone.pending = pending;
+ _clone.futId = futId;
+ _clone.miniId = miniId;
+ _clone.dhtVer = dhtVer;
+ _clone.invalidParts = invalidParts;
+ _clone.ownedVals = ownedVals;
+ _clone.ownedValsBytes = ownedValsBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 10:
- if (!commState.putCacheVersion(dhtVer))
++ if (!commState.putCacheVersion("dhtVer", dhtVer))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ case 12:
+ if (invalidParts != null) {
+ if (commState.it == null) {
- if (!commState.putInt(invalidParts.size()))
++ if (!commState.putInt(null, invalidParts.size()))
+ return false;
+
+ commState.it = invalidParts.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putInt((int)commState.cur))
++ if (!commState.putInt(null, (int)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putGridUuid(miniId))
++ if (!commState.putGridUuid("miniId", miniId))
+ return false;
+
+ commState.idx++;
+
+ case 14:
+ if (ownedValsBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(ownedValsBytes.size()))
++ if (!commState.putInt(null, ownedValsBytes.size()))
+ return false;
+
+ commState.it = ownedValsBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 15:
+ if (pending != null) {
+ if (commState.it == null) {
- if (!commState.putInt(pending.size()))
++ if (!commState.putInt(null, pending.size()))
+ return false;
+
+ commState.it = pending.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putCacheVersion((GridCacheVersion)commState.cur))
++ if (!commState.putCacheVersion(null, (GridCacheVersion)commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 10:
- GridCacheVersion dhtVer0 = commState.getCacheVersion();
++ dhtVer = commState.getCacheVersion("dhtVer");
+
- if (dhtVer0 == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- dhtVer = dhtVer0;
-
+ commState.idx++;
+
+ case 11:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ case 12:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (invalidParts == null)
+ invalidParts = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- if (buf.remaining() < 4)
- return false;
++ int _val = commState.getInt(null);
+
- int _val = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ invalidParts.add((Integer)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 13:
- IgniteUuid miniId0 = commState.getGridUuid();
++ miniId = commState.getGridUuid("miniId");
+
- if (miniId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- miniId = miniId0;
-
+ commState.idx++;
+
+ case 14:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (ownedValsBytes == null)
+ ownedValsBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ ownedValsBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 15:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (pending == null)
+ pending = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- GridCacheVersion _val = commState.getCacheVersion();
++ GridCacheVersion _val = commState.getCacheVersion(null);
+
- if (_val == CACHE_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ pending.add((GridCacheVersion)_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 55;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearTxPrepareResponse.class, this, "super", super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
index 0000000,06ce651..40d3506
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearUnlockRequest.java
@@@ -1,0 -1,103 +1,103 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.distributed.near;
+
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Near cache unlock request.
+ */
+ public class GridNearUnlockRequest<K, V> extends GridDistributedUnlockRequest<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridNearUnlockRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param keyCnt Key count.
+ */
+ public GridNearUnlockRequest(int cacheId, int keyCnt) {
+ super(cacheId, keyCnt);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridNearUnlockRequest _clone = new GridNearUnlockRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 56;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridNearUnlockRequest.class, this, super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 0000000,e3f2c94..7794028
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@@ -1,0 -1,818 +1,800 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.query;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryType.*;
+
+ /**
+ * Query request.
+ */
+ public class GridCacheQueryRequest<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long id;
+
+ /** */
+ private String cacheName;
+
+ /** */
+ private GridCacheQueryType type;
+
+ /** */
+ private boolean fields;
+
+ /** */
+ private String clause;
+
+ /** */
+ private String clsName;
+
+ /** */
+ @GridDirectTransient
+ private IgniteBiPredicate<Object, Object> keyValFilter;
+
+ /** */
+ private byte[] keyValFilterBytes;
+
+ /** */
+ @GridDirectTransient
+ private IgnitePredicate<CacheEntry<Object, Object>> prjFilter;
+
+ /** */
+ private byte[] prjFilterBytes;
+
+ /** */
+ @GridDirectTransient
+ private IgniteReducer<Object, Object> rdc;
+
+ /** */
+ private byte[] rdcBytes;
+
+ /** */
+ @GridDirectTransient
+ private IgniteClosure<Object, Object> trans;
+
+ /** */
+ private byte[] transBytes;
+
+ /** */
+ @GridDirectTransient
+ private Object[] args;
+
+ /** */
+ private byte[] argsBytes;
+
+ /** */
+ private int pageSize;
+
+ /** */
+ private boolean incBackups;
+
+ /** */
+ private boolean cancel;
+
+ /** */
+ private boolean incMeta;
+
+ /** */
+ private boolean all;
+
+ /** */
+ @GridDirectVersion(1)
+ private boolean keepPortable;
+
+ /** */
+ @GridDirectVersion(2)
+ private UUID subjId;
+
+ /** */
+ @GridDirectVersion(2)
+ private int taskHash;
+
+ /**
+ * Required by {@link Externalizable}
+ */
+ public GridCacheQueryRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param id Request to cancel.
+ * @param fields Fields query flag.
+ */
+ public GridCacheQueryRequest(int cacheId, long id, boolean fields) {
+ this.cacheId = cacheId;
+ this.id = id;
+ this.fields = fields;
+
+ cancel = true;
+ }
+
+ /**
+ * Request to load page.
+ *
+ * @param cacheId Cache ID.
+ * @param id Request ID.
+ * @param cacheName Cache name.
+ * @param pageSize Page size.
+ * @param incBackups {@code true} if need to include backups.
+ * @param fields Fields query flag.
+ * @param all Whether to load all pages.
+ * @param keepPortable Whether to keep portables.
+ */
+ public GridCacheQueryRequest(
+ int cacheId,
+ long id,
+ String cacheName,
+ int pageSize,
+ boolean incBackups,
+ boolean fields,
+ boolean all,
+ boolean keepPortable,
+ UUID subjId,
+ int taskHash
+ ) {
+ this.cacheId = cacheId;
+ this.id = id;
+ this.cacheName = cacheName;
+ this.pageSize = pageSize;
+ this.incBackups = incBackups;
+ this.fields = fields;
+ this.all = all;
+ this.keepPortable = keepPortable;
+ this.subjId = subjId;
+ this.taskHash = taskHash;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param id Request id.
+ * @param cacheName Cache name.
+ * @param type Query type.
+ * @param fields {@code true} if query returns fields.
+ * @param clause Query clause.
+ * @param clsName Query class name.
+ * @param keyValFilter Key-value filter.
+ * @param prjFilter Projection filter.
+ * @param rdc Reducer.
+ * @param trans Transformer.
+ * @param pageSize Page size.
+ * @param incBackups {@code true} if need to include backups.
+ * @param args Query arguments.
+ * @param incMeta Include meta data or not.
+ */
+ public GridCacheQueryRequest(
+ int cacheId,
+ long id,
+ String cacheName,
+ GridCacheQueryType type,
+ boolean fields,
+ String clause,
+ String clsName,
+ IgniteBiPredicate<Object, Object> keyValFilter,
+ IgnitePredicate<CacheEntry<Object, Object>> prjFilter,
+ IgniteReducer<Object, Object> rdc,
+ IgniteClosure<Object, Object> trans,
+ int pageSize,
+ boolean incBackups,
+ Object[] args,
+ boolean incMeta,
+ boolean keepPortable,
+ UUID subjId,
+ int taskHash
+ ) {
+ assert type != null || fields;
+ assert clause != null || (type == SCAN || type == SET || type == SPI);
+ assert clsName != null || fields || type == SCAN || type == SET || type == SPI;
+
+ this.cacheId = cacheId;
+ this.id = id;
+ this.cacheName = cacheName;
+ this.type = type;
+ this.fields = fields;
+ this.clause = clause;
+ this.clsName = clsName;
+ this.keyValFilter = keyValFilter;
+ this.prjFilter = prjFilter;
+ this.rdc = rdc;
+ this.trans = trans;
+ this.pageSize = pageSize;
+ this.incBackups = incBackups;
+ this.args = args;
+ this.incMeta = incMeta;
+ this.keepPortable = keepPortable;
+ this.subjId = subjId;
+ this.taskHash = taskHash;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (keyValFilter != null) {
+ if (ctx.deploymentEnabled())
+ prepareObject(keyValFilter, ctx);
+
+ keyValFilterBytes = CU.marshal(ctx, keyValFilter);
+ }
+
+ if (prjFilter != null) {
+ if (ctx.deploymentEnabled())
+ prepareObject(prjFilter, ctx);
+
+ prjFilterBytes = CU.marshal(ctx, prjFilter);
+ }
+
+ if (rdc != null) {
+ if (ctx.deploymentEnabled())
+ prepareObject(rdc, ctx);
+
+ rdcBytes = CU.marshal(ctx, rdc);
+ }
+
+ if (trans != null) {
+ if (ctx.deploymentEnabled())
+ prepareObject(trans, ctx);
+
+ transBytes = CU.marshal(ctx, trans);
+ }
+
+ if (!F.isEmpty(args)) {
+ if (ctx.deploymentEnabled()) {
+ for (Object arg : args)
+ prepareObject(arg, ctx);
+ }
+
+ argsBytes = CU.marshal(ctx, args);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ IgniteMarshaller mrsh = ctx.marshaller();
+
+ if (keyValFilterBytes != null)
+ keyValFilter = mrsh.unmarshal(keyValFilterBytes, ldr);
+
+ if (prjFilterBytes != null)
+ prjFilter = mrsh.unmarshal(prjFilterBytes, ldr);
+
+ if (rdcBytes != null)
+ rdc = mrsh.unmarshal(rdcBytes, ldr);
+
+ if (transBytes != null)
+ trans = mrsh.unmarshal(transBytes, ldr);
+
+ if (argsBytes != null)
+ args = mrsh.unmarshal(argsBytes, ldr);
+ }
+
+ /**
+ * @param ctx Context.
+ * @throws IgniteCheckedException In case of error.
+ */
+ void beforeLocalExecution(GridCacheContext<K, V> ctx) throws IgniteCheckedException {
+ IgniteMarshaller marsh = ctx.marshaller();
+
+ rdc = rdc != null ? marsh.<IgniteReducer<Object, Object>>unmarshal(marsh.marshal(rdc), null) : null;
+ trans = trans != null ? marsh.<IgniteClosure<Object, Object>>unmarshal(marsh.marshal(trans), null) : null;
+ }
+
+ /**
+ * @return Request id.
+ */
+ public long id() {
+ return id;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return cacheName;
+ }
+
+ /**
+ * @return Query type.
+ */
+ public GridCacheQueryType type() {
+ return type;
+ }
+
+ /**
+ * @return {@code true} if query returns fields.
+ */
+ public boolean fields() {
+ return fields;
+ }
+
+ /**
+ * @return Query clause.
+ */
+ public String clause() {
+ return clause;
+ }
+
+ /**
+ * @return Class name.
+ */
+ public String className() {
+ return clsName;
+ }
+
+ /**
+ * @return Flag indicating whether to include backups.
+ */
+ public boolean includeBackups() {
+ return incBackups;
+ }
+
+ /**
+ * @return Flag indicating that this is cancel request.
+ */
+ public boolean cancel() {
+ return cancel;
+ }
+
+ /**
+ * @return Key-value filter.
+ */
+ public IgniteBiPredicate<Object, Object> keyValueFilter() {
+ return keyValFilter;
+ }
+
+ /** {@inheritDoc} */
+ public IgnitePredicate<CacheEntry<Object, Object>> projectionFilter() {
+ return prjFilter;
+ }
+
+ /**
+ * @return Reducer.
+ */
+ public IgniteReducer<Object, Object> reducer() {
+ return rdc;
+ }
+
+ /**
+ * @return Transformer.
+ */
+ public IgniteClosure<Object, Object> transformer() {
+ return trans;
+ }
+
+ /**
+ * @return Page size.
+ */
+ public int pageSize() {
+ return pageSize;
+ }
+
+ /**
+ * @return Arguments.
+ */
+ public Object[] arguments() {
+ return args;
+ }
+
+ /**
+ * @return Include meta data or not.
+ */
+ public boolean includeMetaData() {
+ return incMeta;
+ }
+
+ /**
+ * @return Whether to load all pages.
+ */
+ public boolean allPages() {
+ return all;
+ }
+
+ /**
+ * @return Whether to keep portables.
+ */
+ public boolean keepPortable() {
+ return keepPortable;
+ }
+
+ /**
+ * @return Security subject ID.
+ */
+ public UUID subjectId() {
+ return subjId;
+ }
+
+ /**
+ * @return Task hash.
+ */
+ public int taskHash() {
+ return taskHash;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCacheQueryRequest _clone = new GridCacheQueryRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCacheQueryRequest _clone = (GridCacheQueryRequest)_msg;
+
+ _clone.id = id;
+ _clone.cacheName = cacheName;
+ _clone.type = type;
+ _clone.fields = fields;
+ _clone.clause = clause;
+ _clone.clsName = clsName;
+ _clone.keyValFilter = keyValFilter;
+ _clone.keyValFilterBytes = keyValFilterBytes;
+ _clone.prjFilter = prjFilter;
+ _clone.prjFilterBytes = prjFilterBytes;
+ _clone.rdc = rdc;
+ _clone.rdcBytes = rdcBytes;
+ _clone.trans = trans;
+ _clone.transBytes = transBytes;
+ _clone.args = args;
+ _clone.argsBytes = argsBytes;
+ _clone.pageSize = pageSize;
+ _clone.incBackups = incBackups;
+ _clone.cancel = cancel;
+ _clone.incMeta = incMeta;
+ _clone.all = all;
+ _clone.keepPortable = keepPortable;
+ _clone.subjId = subjId;
+ _clone.taskHash = taskHash;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
- if (!commState.putBoolean(all))
++ if (!commState.putBoolean("all", all))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putByteArray(argsBytes))
++ if (!commState.putByteArray("argsBytes", argsBytes))
+ return false;
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putString(cacheName))
++ if (!commState.putString("cacheName", cacheName))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putBoolean(cancel))
++ if (!commState.putBoolean("cancel", cancel))
+ return false;
+
+ commState.idx++;
+
+ case 7:
- if (!commState.putString(clause))
++ if (!commState.putString("clause", clause))
+ return false;
+
+ commState.idx++;
+
+ case 8:
- if (!commState.putString(clsName))
++ if (!commState.putString("clsName", clsName))
+ return false;
+
+ commState.idx++;
+
+ case 9:
- if (!commState.putBoolean(fields))
++ if (!commState.putBoolean("fields", fields))
+ return false;
+
+ commState.idx++;
+
+ case 10:
- if (!commState.putLong(id))
++ if (!commState.putLong("id", id))
+ return false;
+
+ commState.idx++;
+
+ case 11:
- if (!commState.putBoolean(incBackups))
++ if (!commState.putBoolean("incBackups", incBackups))
+ return false;
+
+ commState.idx++;
+
+ case 12:
- if (!commState.putBoolean(incMeta))
++ if (!commState.putBoolean("incMeta", incMeta))
+ return false;
+
+ commState.idx++;
+
+ case 13:
- if (!commState.putByteArray(keyValFilterBytes))
++ if (!commState.putByteArray("keyValFilterBytes", keyValFilterBytes))
+ return false;
+
+ commState.idx++;
+
+ case 14:
- if (!commState.putInt(pageSize))
++ if (!commState.putInt("pageSize", pageSize))
+ return false;
+
+ commState.idx++;
+
+ case 15:
- if (!commState.putByteArray(prjFilterBytes))
++ if (!commState.putByteArray("prjFilterBytes", prjFilterBytes))
+ return false;
+
+ commState.idx++;
+
+ case 16:
- if (!commState.putByteArray(rdcBytes))
++ if (!commState.putByteArray("rdcBytes", rdcBytes))
+ return false;
+
+ commState.idx++;
+
+ case 17:
- if (!commState.putByteArray(transBytes))
++ if (!commState.putByteArray("transBytes", transBytes))
+ return false;
+
+ commState.idx++;
+
+ case 18:
- if (!commState.putEnum(type))
++ if (!commState.putEnum("type", type))
+ return false;
+
+ commState.idx++;
+
+ case 19:
- if (!commState.putBoolean(keepPortable))
++ if (!commState.putBoolean("keepPortable", keepPortable))
+ return false;
+
+ commState.idx++;
+
+ case 20:
- if (!commState.putUuid(subjId))
++ if (!commState.putUuid("subjId", subjId))
+ return false;
+
+ commState.idx++;
+
+ case 21:
- if (!commState.putInt(taskHash))
++ if (!commState.putInt("taskHash", taskHash))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
- if (buf.remaining() < 1)
- return false;
++ all = commState.getBoolean("all");
+
- all = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 4:
- byte[] argsBytes0 = commState.getByteArray();
++ argsBytes = commState.getByteArray("argsBytes");
+
- if (argsBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- argsBytes = argsBytes0;
-
+ commState.idx++;
+
+ case 5:
- String cacheName0 = commState.getString();
++ cacheName = commState.getString("cacheName");
+
- if (cacheName0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- cacheName = cacheName0;
-
+ commState.idx++;
+
+ case 6:
- if (buf.remaining() < 1)
- return false;
++ cancel = commState.getBoolean("cancel");
+
- cancel = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 7:
- String clause0 = commState.getString();
++ clause = commState.getString("clause");
+
- if (clause0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- clause = clause0;
-
+ commState.idx++;
+
+ case 8:
- String clsName0 = commState.getString();
++ clsName = commState.getString("clsName");
+
- if (clsName0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- clsName = clsName0;
-
+ commState.idx++;
+
+ case 9:
- if (buf.remaining() < 1)
- return false;
++ fields = commState.getBoolean("fields");
+
- fields = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 10:
- if (buf.remaining() < 8)
- return false;
++ id = commState.getLong("id");
+
- id = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 11:
- if (buf.remaining() < 1)
- return false;
++ incBackups = commState.getBoolean("incBackups");
+
- incBackups = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 12:
- if (buf.remaining() < 1)
- return false;
++ incMeta = commState.getBoolean("incMeta");
+
- incMeta = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 13:
- byte[] keyValFilterBytes0 = commState.getByteArray();
++ keyValFilterBytes = commState.getByteArray("keyValFilterBytes");
+
- if (keyValFilterBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- keyValFilterBytes = keyValFilterBytes0;
-
+ commState.idx++;
+
+ case 14:
- if (buf.remaining() < 4)
- return false;
++ pageSize = commState.getInt("pageSize");
+
- pageSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 15:
- byte[] prjFilterBytes0 = commState.getByteArray();
++ prjFilterBytes = commState.getByteArray("prjFilterBytes");
+
- if (prjFilterBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- prjFilterBytes = prjFilterBytes0;
-
+ commState.idx++;
+
+ case 16:
- byte[] rdcBytes0 = commState.getByteArray();
++ rdcBytes = commState.getByteArray("rdcBytes");
+
- if (rdcBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- rdcBytes = rdcBytes0;
-
+ commState.idx++;
+
+ case 17:
- byte[] transBytes0 = commState.getByteArray();
++ transBytes = commState.getByteArray("transBytes");
+
- if (transBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- transBytes = transBytes0;
-
+ commState.idx++;
+
+ case 18:
- if (buf.remaining() < 1)
- return false;
++ byte type0 = commState.getByte("type");
+
- byte type0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ type = GridCacheQueryType.fromOrdinal(type0);
+
+ commState.idx++;
+
+ case 19:
- if (buf.remaining() < 1)
- return false;
++ keepPortable = commState.getBoolean("keepPortable");
+
- keepPortable = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 20:
- UUID subjId0 = commState.getUuid();
++ subjId = commState.getUuid("subjId");
+
- if (subjId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- subjId = subjId0;
-
+ commState.idx++;
+
+ case 21:
- if (buf.remaining() < 4)
- return false;
++ taskHash = commState.getInt("taskHash");
+
- taskHash = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 57;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheQueryRequest.class, this, super.toString());
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
index 0000000,7a7b9b9..01569cd
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryResponse.java
@@@ -1,0 -1,445 +1,443 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.cache.query;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.query.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Query request.
+ */
+ public class GridCacheQueryResponse<K, V> extends GridCacheMessage<K, V> implements GridCacheDeployable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private boolean finished;
+
+ /** */
+ private long reqId;
+
+ /** */
+ @GridDirectTransient
+ private Throwable err;
+
+ /** */
+ private byte[] errBytes;
+
+ /** */
+ private boolean fields;
+
+ /** */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> metaDataBytes;
+
+ /** */
+ @GridToStringInclude
+ @GridDirectTransient
+ private List<GridQueryFieldMetadata> metadata;
+
+ /** */
+ @GridDirectCollection(byte[].class)
+ private Collection<byte[]> dataBytes;
+
+ /** */
+ @GridDirectTransient
+ private Collection<Object> data;
+
+ /**
+ * Empty constructor for {@link Externalizable}
+ */
+ public GridCacheQueryResponse() {
+ //No-op.
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param reqId Request id.
+ * @param finished Last response or not.
+ * @param fields Fields query or not.
+ */
+ public GridCacheQueryResponse(int cacheId, long reqId, boolean finished, boolean fields) {
+ this.cacheId = cacheId;
+ this.reqId = reqId;
+ this.finished = finished;
+ this.fields = fields;
+ }
+
+ /**
+ * @param cacheId Cache ID.
+ * @param reqId Request id.
+ * @param err Error.
+ */
+ public GridCacheQueryResponse(int cacheId, long reqId, Throwable err) {
+ this.cacheId = cacheId;
+ this.reqId = reqId;
+ this.err = err;
+ finished = true;
+ }
+
+ /** {@inheritDoc}
+ * @param ctx*/
+ @Override public void prepareMarshal(GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
+ super.prepareMarshal(ctx);
+
+ if (err != null)
+ errBytes = ctx.marshaller().marshal(err);
+
+ metaDataBytes = marshalCollection(metadata, ctx);
+ dataBytes = marshalCollection(data, ctx);
+
+ if (ctx.deploymentEnabled() && !F.isEmpty(data)) {
+ for (Object o : data) {
+ if (o instanceof Map.Entry) {
+ Map.Entry e = (Map.Entry)o;
+
+ prepareObject(e.getKey(), ctx);
+ prepareObject(e.getValue(), ctx);
+ }
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
+ super.finishUnmarshal(ctx, ldr);
+
+ if (errBytes != null)
+ err = ctx.marshaller().unmarshal(errBytes, ldr);
+
+ metadata = unmarshalCollection(metaDataBytes, ctx, ldr);
+ data = unmarshalCollection(dataBytes, ctx, ldr);
+ }
+
+ /**
+ * @return Metadata.
+ */
+ public List<GridQueryFieldMetadata> metadata() {
+ return metadata;
+ }
+
+ /**
+ * @param metadata Metadata.
+ */
+ public void metadata(@Nullable List<GridQueryFieldMetadata> metadata) {
+ this.metadata = metadata;
+ }
+
+ /**
+ * @return Query data.
+ */
+ public Collection<Object> data() {
+ return data;
+ }
+
+ /**
+ * @param data Query data.
+ */
+ @SuppressWarnings("unchecked")
+ public void data(Collection<?> data) {
+ this.data = (Collection<Object>)data;
+ }
+
+ /**
+ * @return If this is last response for this request or not.
+ */
+ public boolean isFinished() {
+ return finished;
+ }
+
+ /**
+ * @param finished If this is last response for this request or not.
+ */
+ public void finished(boolean finished) {
+ this.finished = finished;
+ }
+
+ /**
+ * @return Request id.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Error.
+ */
+ public Throwable error() {
+ return err;
+ }
+
+ /**
+ * @return If fields query.
+ */
+ public boolean fields() {
+ return fields;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridCacheQueryResponse _clone = new GridCacheQueryResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridCacheQueryResponse _clone = (GridCacheQueryResponse)_msg;
+
+ _clone.finished = finished;
+ _clone.reqId = reqId;
+ _clone.err = err;
+ _clone.errBytes = errBytes;
+ _clone.fields = fields;
+ _clone.metaDataBytes = metaDataBytes;
+ _clone.metadata = metadata;
+ _clone.dataBytes = dataBytes;
+ _clone.data = data;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 3:
+ if (dataBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(dataBytes.size()))
++ if (!commState.putInt(null, dataBytes.size()))
+ return false;
+
+ commState.it = dataBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putByteArray(errBytes))
++ if (!commState.putByteArray("errBytes", errBytes))
+ return false;
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putBoolean(fields))
++ if (!commState.putBoolean("fields", fields))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putBoolean(finished))
++ if (!commState.putBoolean("finished", finished))
+ return false;
+
+ commState.idx++;
+
+ case 7:
+ if (metaDataBytes != null) {
+ if (commState.it == null) {
- if (!commState.putInt(metaDataBytes.size()))
++ if (!commState.putInt(null, metaDataBytes.size()))
+ return false;
+
+ commState.it = metaDataBytes.iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
- if (!commState.putByteArray((byte[])commState.cur))
++ if (!commState.putByteArray(null, (byte[])commState.cur))
+ return false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 8:
- if (!commState.putLong(reqId))
++ if (!commState.putLong("reqId", reqId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 3:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (dataBytes == null)
+ dataBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ dataBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 4:
- byte[] errBytes0 = commState.getByteArray();
++ errBytes = commState.getByteArray("errBytes");
+
- if (errBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- errBytes = errBytes0;
-
+ commState.idx++;
+
+ case 5:
- if (buf.remaining() < 1)
- return false;
++ fields = commState.getBoolean("fields");
+
- fields = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 6:
- if (buf.remaining() < 1)
- return false;
++ finished = commState.getBoolean("finished");
+
- finished = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 7:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (metaDataBytes == null)
+ metaDataBytes = new ArrayList<>(commState.readSize);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
- byte[] _val = commState.getByteArray();
++ byte[] _val = commState.getByteArray(null);
+
- if (_val == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ metaDataBytes.add((byte[])_val);
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+
+ commState.idx++;
+
+ case 8:
- if (buf.remaining() < 8)
- return false;
++ reqId = commState.getLong("reqId");
+
- reqId = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 58;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheQueryResponse.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
index 0000000,3164c46..4f9345c
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/clock/GridClockDeltaSnapshotMessage.java
@@@ -1,0 -1,226 +1,224 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.clock;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.tostring.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Message containing time delta map for all nodes.
+ */
+ public class GridClockDeltaSnapshotMessage extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Snapshot version. */
+ private GridClockDeltaVersion snapVer;
+
+ /** Grid time deltas. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = UUID.class, valueType = long.class)
+ private Map<UUID, Long> deltas;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridClockDeltaSnapshotMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param snapVer Snapshot version.
+ * @param deltas Deltas map.
+ */
+ public GridClockDeltaSnapshotMessage(GridClockDeltaVersion snapVer, Map<UUID, Long> deltas) {
+ this.snapVer = snapVer;
+ this.deltas = deltas;
+ }
+
+ /**
+ * @return Snapshot version.
+ */
+ public GridClockDeltaVersion snapshotVersion() {
+ return snapVer;
+ }
+
+ /**
+ * @return Time deltas map.
+ */
+ public Map<UUID, Long> deltas() {
+ return deltas;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridClockDeltaSnapshotMessage _clone = new GridClockDeltaSnapshotMessage();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridClockDeltaSnapshotMessage _clone = (GridClockDeltaSnapshotMessage)_msg;
+
+ _clone.snapVer = snapVer;
+ _clone.deltas = deltas;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
+ if (deltas != null) {
+ if (commState.it == null) {
- if (!commState.putInt(deltas.size()))
++ if (!commState.putInt(null, deltas.size()))
+ return false;
+
+ commState.it = deltas.entrySet().iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ Map.Entry<UUID, Long> e = (Map.Entry<UUID, Long>)commState.cur;
+
+ if (!commState.keyDone) {
- if (!commState.putUuid(e.getKey()))
++ if (!commState.putUuid(null, e.getKey()))
+ return false;
+
+ commState.keyDone = true;
+ }
+
- if (!commState.putLong(e.getValue()))
++ if (!commState.putLong(null, e.getValue()))
+ return false;
+
+ commState.keyDone = false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putClockDeltaVersion(snapVer))
++ if (!commState.putClockDeltaVersion("snapVer", snapVer))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (deltas == null)
- deltas = U.newHashMap(commState.readSize);
++ deltas = new HashMap<>(commState.readSize, 1.0f);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ if (!commState.keyDone) {
- UUID _val = commState.getUuid();
++ UUID _val = commState.getUuid(null);
+
- if (_val == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ commState.cur = _val;
+ commState.keyDone = true;
+ }
+
- if (buf.remaining() < 8)
- return false;
++ long _val = commState.getLong(null);
+
- long _val = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ deltas.put((UUID)commState.cur, _val);
+
+ commState.keyDone = false;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+ commState.cur = null;
+
+ commState.idx++;
+
+ case 1:
- GridClockDeltaVersion snapVer0 = commState.getClockDeltaVersion();
++ snapVer = commState.getClockDeltaVersion("snapVer");
+
- if (snapVer0 == CLOCK_DELTA_VER_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- snapVer = snapVer0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 59;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClockDeltaSnapshotMessage.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
index 0000000,1c63a8a..11339c6
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousMessage.java
@@@ -1,0 -1,256 +1,252 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.continuous;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.processors.continuous.GridContinuousMessageType.*;
+
+ /**
+ * Continuous processor message.
+ */
+ public class GridContinuousMessage extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Message type. */
+ private GridContinuousMessageType type;
+
+ /** Routine ID. */
+ private UUID routineId;
+
+ /** Optional message data. */
+ @GridToStringInclude
+ @GridDirectTransient
+ private Object data;
+
+ /** Serialized message data. */
+ private byte[] dataBytes;
+
+ /** Future ID for synchronous event notifications. */
+ private IgniteUuid futId;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public GridContinuousMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param type Message type.
+ * @param routineId Consume ID.
+ * @param futId Future ID.
+ * @param data Optional message data.
+ */
+ GridContinuousMessage(GridContinuousMessageType type,
+ @Nullable UUID routineId,
+ @Nullable IgniteUuid futId,
+ @Nullable Object data) {
+ assert type != null;
+ assert routineId != null || type == MSG_EVT_ACK;
+
+ this.type = type;
+ this.routineId = routineId;
+ this.futId = futId;
+ this.data = data;
+ }
+
+ /**
+ * @return Message type.
+ */
+ public GridContinuousMessageType type() {
+ return type;
+ }
+
+ /**
+ * @return Consume ID.
+ */
+ public UUID routineId() {
+ return routineId;
+ }
+
+ /**
+ * @return Message data.
+ */
+ @SuppressWarnings("unchecked")
+ public <T> T data() {
+ return (T)data;
+ }
+
+ /**
+ * @param data Message data.
+ */
+ public void data(Object data) {
+ this.data = data;
+ }
+
+ /**
+ * @return Serialized message data.
+ */
+ public byte[] dataBytes() {
+ return dataBytes;
+ }
+
+ /**
+ * @param dataBytes Serialized message data.
+ */
+ public void dataBytes(byte[] dataBytes) {
+ this.dataBytes = dataBytes;
+ }
+
+ /**
+ * @return Future ID for synchronous event notification.
+ */
+ @Nullable public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridContinuousMessage _clone = new GridContinuousMessage();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter msg) {
+ GridContinuousMessage clone = (GridContinuousMessage)msg;
+
+ clone.type = type;
+ clone.routineId = routineId;
+ clone.futId = futId;
+ clone.data = data;
+ clone.dataBytes = dataBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("fallthrough")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putByteArray(dataBytes))
++ if (!commState.putByteArray("dataBytes", dataBytes))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid(null, futId))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putUuid(routineId))
++ if (!commState.putUuid(null, routineId))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putEnum(type))
++ if (!commState.putEnum(null, type))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("fallthrough")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- byte[] dataBytes0 = commState.getByteArray();
++ dataBytes = commState.getByteArray("dataBytes");
+
- if (dataBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- dataBytes = dataBytes0;
-
+ commState.idx++;
+
+ case 1:
- IgniteUuid futId0 = commState.getGridUuid();
++ IgniteUuid futId0 = commState.getGridUuid(null);
+
+ if (futId0 == GRID_UUID_NOT_READ)
+ return false;
+
+ futId = futId0;
+
+ commState.idx++;
+
+ case 2:
- UUID routineId0 = commState.getUuid();
++ UUID routineId0 = commState.getUuid(null);
+
- if (routineId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- routineId = routineId0;
-
+ commState.idx++;
+
+ case 3:
+ if (buf.remaining() < 1)
+ return false;
+
- byte type0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
- type = fromOrdinal(type0);
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 60;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridContinuousMessage.class, this);
+ }
+ }