You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:23 UTC
[31/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
index 0000000,05f7ffb..822c0bc
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageAdapter.java
@@@ -1,0 -1,218 +1,206 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.util.direct;
+
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.clock.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+
+ import static org.apache.ignite.events.IgniteEventType.*;
+
+ /**
+ * Communication message adapter.
+ */
+ public abstract class GridTcpCommunicationMessageAdapter implements Serializable, Cloneable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ public static final byte[] BYTE_ARR_NOT_READ = new byte[0];
+
+ /** */
+ public static final short[] SHORT_ARR_NOT_READ = new short[0];
+
+ /** */
+ public static final int[] INT_ARR_NOT_READ = new int[0];
+
+ /** */
+ public static final long[] LONG_ARR_NOT_READ = new long[0];
+
+ /** */
+ public static final float[] FLOAT_ARR_NOT_READ = new float[0];
+
+ /** */
+ public static final double[] DOUBLE_ARR_NOT_READ = new double[0];
+
+ /** */
+ public static final char[] CHAR_ARR_NOT_READ = new char[0];
+
+ /** */
+ public static final boolean[] BOOLEAN_ARR_NOT_READ = new boolean[0];
+
+ /** */
+ public static final UUID UUID_NOT_READ = new UUID(0, 0);
+
+ /** */
++ public static final ByteBuffer BYTE_BUF_NOT_READ = ByteBuffer.allocate(0);
++
++ /** */
+ public static final IgniteUuid GRID_UUID_NOT_READ = new IgniteUuid(new UUID(0, 0), 0);
+
+ /** */
+ public static final GridClockDeltaVersion CLOCK_DELTA_VER_NOT_READ = new GridClockDeltaVersion(0, 0);
+
+ /** */
+ public static final GridByteArrayList BYTE_ARR_LIST_NOT_READ = new GridByteArrayList(new byte[0]);
+
+ /** */
+ public static final GridLongList LONG_LIST_NOT_READ = new GridLongList(0);
+
+ /** */
+ public static final GridCacheVersion CACHE_VER_NOT_READ = new GridCacheVersion(0, 0, 0, 0);
+
+ /** */
+ public static final GridDhtPartitionExchangeId DHT_PART_EXCHANGE_ID_NOT_READ =
+ new GridDhtPartitionExchangeId(new UUID(0, 0), EVT_NODE_LEFT, 1);
+
+ /** */
+ public static final GridCacheValueBytes VAL_BYTES_NOT_READ = new GridCacheValueBytes();
+
+ /** */
+ @SuppressWarnings("RedundantStringConstructorCall")
+ public static final String STR_NOT_READ = new String();
+
+ /** */
+ public static final BitSet BIT_SET_NOT_READ = new BitSet();
+
+ /** */
- public static final Enum<?> ENUM_NOT_READ = DummyEnum.DUMMY;
-
- /** */
+ public static final GridTcpCommunicationMessageAdapter MSG_NOT_READ = new GridTcpCommunicationMessageAdapter() {
+ @SuppressWarnings("CloneDoesntCallSuperClone")
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public boolean writeTo(ByteBuffer buf) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public boolean readFrom(ByteBuffer buf) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override public byte directType() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ /** */
+ protected static final Object NULL = new Object();
+
+ /** */
+ protected final GridTcpCommunicationMessageState commState = new GridTcpCommunicationMessageState();
+
+ /**
- * @param msgWriter Message writer.
- * @param nodeId Node ID (provided only if versions are different).
++ * @param writer Writer.
+ */
- public void messageWriter(GridNioMessageWriter msgWriter, @Nullable UUID nodeId) {
- assert msgWriter != null;
++ public final void setWriter(MessageWriter writer) {
++ assert writer != null;
+
- commState.messageWriter(msgWriter, nodeId);
++ commState.setWriter(writer);
+ }
+
+ /**
- * @param msgReader Message reader.
- * @param nodeId Node ID (provided only if versions are different).
++ * @param reader Reader.
+ */
- public void messageReader(GridNioMessageReader msgReader, @Nullable UUID nodeId) {
- assert msgReader != null;
++ public final void setReader(MessageReader reader) {
++ assert reader != null;
+
- commState.messageReader(msgReader, nodeId);
++ commState.setReader(reader);
+ }
+
+ /**
+ * @param buf Byte buffer.
+ * @return Whether message was fully written.
+ */
+ public abstract boolean writeTo(ByteBuffer buf);
+
+ /**
+ * @param buf Byte buffer.
+ * @return Whether message was fully read.
+ */
+ public abstract boolean readFrom(ByteBuffer buf);
+
+ /**
+ * @return Message type.
+ */
+ public abstract byte directType();
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
+ @Override public abstract GridTcpCommunicationMessageAdapter clone();
+
+ /**
+ * Clones all fields of the provided message to {@code this}.
+ *
+ * @param _msg Message to clone from.
+ */
+ protected abstract void clone0(GridTcpCommunicationMessageAdapter _msg);
+
+ /**
+ * @return {@code True} if should skip recovery for this message.
+ */
+ public boolean skipRecovery() {
+ return false;
+ }
+
+ /**
+ * @param arr Array.
+ * @return Array iterator.
+ */
+ protected final Iterator<?> arrayIterator(final Object[] arr) {
+ return new Iterator<Object>() {
+ private int idx;
+
+ @Override public boolean hasNext() {
+ return idx < arr.length;
+ }
+
+ @Override public Object next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ return arr[idx++];
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
-
- /**
- * Dummy enum.
- */
- private enum DummyEnum {
- /** */
- DUMMY
- }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
index 0000000,9ef1b10..7007493
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageFactory.java
@@@ -1,0 -1,359 +1,359 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.util.direct;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.managers.checkpoint.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.distributed.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.cache.distributed.near.*;
+ import org.apache.ignite.internal.processors.cache.query.*;
+ import org.apache.ignite.internal.processors.clock.*;
+ import org.apache.ignite.internal.processors.continuous.*;
+ import org.apache.ignite.internal.processors.dataload.*;
+ import org.apache.ignite.internal.processors.rest.handlers.task.*;
+ import org.apache.ignite.internal.processors.streamer.*;
+ import org.apache.ignite.spi.collision.jobstealing.*;
+ import org.apache.ignite.spi.communication.tcp.*;
+ import org.jdk8.backport.*;
+
+ import java.util.*;
+
+ /**
+ * Communication message factory.
+ */
+ public class GridTcpCommunicationMessageFactory {
+ /** Common message producers. */
+ private static final GridTcpCommunicationMessageProducer[] COMMON = new GridTcpCommunicationMessageProducer[83];
+
+ /**
+ * Custom messages registry. Used for test purposes.
+ */
+ private static final Map<Byte, GridTcpCommunicationMessageProducer> CUSTOM = new ConcurrentHashMap8<>();
+
+ /** */
+ public static final int MAX_COMMON_TYPE = 82;
+
+ static {
+ registerCommon(new GridTcpCommunicationMessageProducer() {
+ @Override public GridTcpCommunicationMessageAdapter create(byte type) {
+ switch (type) {
+ case 0:
+ return new GridJobCancelRequest();
+
+ case 2:
+ return new GridJobExecuteResponse();
+
+ case 3:
+ return new GridJobSiblingsRequest();
+
+ case 4:
+ return new GridJobSiblingsResponse();
+
+ case 5:
+ return new GridTaskCancelRequest();
+
+ case 6:
+ return new GridTaskSessionRequest();
+
+ case 7:
+ return new GridCheckpointRequest();
+
+ case 8:
+ return new GridIoMessage();
+
+ case 9:
+ return new GridIoUserMessage();
+
+ case 10:
+ return new GridDeploymentInfoBean();
+
+ case 11:
+ return new GridDeploymentRequest();
+
+ case 12:
+ return new GridDeploymentResponse();
+
+ case 13:
+ return new GridEventStorageMessage();
+
+ case 16:
+ return new GridCacheEvictionRequest();
+
+ case 17:
+ return new GridCacheEvictionResponse();
+
+ case 18:
+ return new GridCacheOptimisticCheckPreparedTxRequest();
+
+ case 19:
+ return new GridCacheOptimisticCheckPreparedTxResponse();
+
+ case 20:
+ return new GridCachePessimisticCheckCommittedTxRequest();
+
+ case 21:
+ return new GridCachePessimisticCheckCommittedTxResponse();
+
+ case 22:
+ return new GridDistributedLockRequest();
+
+ case 23:
+ return new GridDistributedLockResponse();
+
+ case 24:
+ return new GridDistributedTxFinishRequest();
+
+ case 25:
+ return new GridDistributedTxFinishResponse();
+
+ case 26:
+ return new GridDistributedTxPrepareRequest();
+
+ case 27:
+ return new GridDistributedTxPrepareResponse();
+
+ case 28:
+ return new GridDistributedUnlockRequest();
+
+ case 29:
+ return new GridDhtLockRequest();
+
+ case 30:
+ return new GridDhtLockResponse();
+
+ case 31:
+ return new GridDhtTxFinishRequest();
+
+ case 32:
+ return new GridDhtTxFinishResponse();
+
+ case 33:
+ return new GridDhtTxPrepareRequest();
+
+ case 34:
+ return new GridDhtTxPrepareResponse();
+
+ case 35:
+ return new GridDhtUnlockRequest();
+
+ case 36:
+ return new GridDhtAtomicDeferredUpdateResponse();
+
+ case 37:
+ return new GridDhtAtomicUpdateRequest();
+
+ case 38:
+ return new GridDhtAtomicUpdateResponse();
+
+ case 39:
+ return new GridNearAtomicUpdateRequest();
+
+ case 40:
+ return new GridNearAtomicUpdateResponse();
+
+ case 41:
+ return new GridDhtForceKeysRequest();
+
+ case 42:
+ return new GridDhtForceKeysResponse();
+
+ case 43:
+ return new GridDhtPartitionDemandMessage();
+
+ case 44:
+ return new GridDhtPartitionSupplyMessage();
+
+ case 45:
+ return new GridDhtPartitionsFullMessage();
+
+ case 46:
+ return new GridDhtPartitionsSingleMessage();
+
+ case 47:
+ return new GridDhtPartitionsSingleRequest();
+
+ case 48:
+ return new GridNearGetRequest();
+
+ case 49:
+ return new GridNearGetResponse();
+
+ case 50:
+ return new GridNearLockRequest();
+
+ case 51:
+ return new GridNearLockResponse();
+
+ case 52:
+ return new GridNearTxFinishRequest();
+
+ case 53:
+ return new GridNearTxFinishResponse();
+
+ case 54:
+ return new GridNearTxPrepareRequest();
+
+ case 55:
+ return new GridNearTxPrepareResponse();
+
+ case 56:
+ return new GridNearUnlockRequest();
+
+ case 57:
+ return new GridCacheQueryRequest();
+
+ case 58:
+ return new GridCacheQueryResponse();
+
+ case 59:
+ return new GridClockDeltaSnapshotMessage();
+
+ case 60:
+ return new GridContinuousMessage();
+
+ case 61:
+ return new GridDataLoadRequest();
+
+ case 62:
+ return new GridDataLoadResponse();
+
+ // 65-72 are GGFS messages (see GridGgfsOpProcessor).
+
+ case 73:
+ return new GridTaskResultRequest();
+
+ case 74:
+ return new GridTaskResultResponse();
+
+ // TODO: Register from streamer processor.
+ case 75:
+ return new GridStreamerCancelRequest();
+
+ case 76:
+ return new GridStreamerExecutionRequest();
+
+ case 77:
+ return new GridStreamerResponse();
+
+ case 78:
+ return new JobStealingRequest();
+
+ case 79:
+ return new GridDhtAffinityAssignmentRequest();
+
+ case 80:
+ return new GridDhtAffinityAssignmentResponse();
+
+ case 81:
+ return new GridJobExecuteRequest();
+
+ case 82:
+ return new GridCacheTtlUpdateRequest();
+
+ default:
+ assert false : "Invalid message type.";
+
+ return null;
+ }
+ }
+ }, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+ 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39,
+ 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59,
+ 60, 61, 62, 63, 64, /* 65-72 - GGFS messages. */ 73, 74, 75, 76, 77, 78, 79,
+ 80, 81, 82);
+ }
+
+ /**
+ * @param type Message type.
+ * @return New message.
+ */
+ public static GridTcpCommunicationMessageAdapter create(byte type) {
+ if (type == TcpCommunicationSpi.NODE_ID_MSG_TYPE)
+ return new TcpCommunicationSpi.NodeIdMessage();
+ else if (type == TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE)
+ return new TcpCommunicationSpi.RecoveryLastReceivedMessage();
+ else if (type == TcpCommunicationSpi.HANDSHAKE_MSG_TYPE)
+ return new TcpCommunicationSpi.HandshakeMessage();
+ else
+ return create0(type);
+ }
+
+ /**
+ * @param type Message type.
+ * @return New message.
+ */
+ private static GridTcpCommunicationMessageAdapter create0(byte type) {
+ if (type >= 0 && type < COMMON.length) {
+ GridTcpCommunicationMessageProducer producer = COMMON[type];
+
+ if (producer != null)
+ return producer.create(type);
+ else
+ throw new IllegalStateException("Common message type producer is not registered: " + type);
+ }
+ else {
+ GridTcpCommunicationMessageProducer c = CUSTOM.get(type);
+
+ if (c != null)
+ return c.create(type);
+ else
+ throw new IllegalStateException("Custom message type producer is not registered: " + type);
+ }
+ }
+
+ /**
+ * Register message producer for common message type.
+ *
+ * @param producer Producer.
+ * @param types Types applicable for this producer.
+ */
+ public static void registerCommon(GridTcpCommunicationMessageProducer producer, int... types) {
+ for (int type : types) {
- assert type >= 0 && type < COMMON.length : "Commmon type being registered is out of common messages " +
++ assert type >= 0 && type < COMMON.length : "Common type being registered is out of common messages " +
+ "array length: " + type;
+
+ COMMON[type] = producer;
+ }
+ }
+
+ /**
+ * Registers factory for custom message. Used for test purposes.
+ *
+ * @param producer Message producer.
+ * @param type Message type.
+ */
+ public static void registerCustom(GridTcpCommunicationMessageProducer producer, byte type) {
+ assert producer != null;
+
+ CUSTOM.put(type, producer);
+ }
+
+ /**
+ * @return Common message producers.
+ */
+ public static GridTcpCommunicationMessageProducer[] commonProducers() {
+ return COMMON;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
index 0000000,9b3863c..85717e4
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/direct/GridTcpCommunicationMessageState.java
@@@ -1,0 -1,1599 +1,774 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.util.direct;
+
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.processors.cache.version.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*;
+ import org.apache.ignite.internal.processors.clock.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ import sun.misc.*;
-import sun.nio.ch.*;
+
+ import java.nio.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.util.direct.GridTcpCommunicationMessageAdapter.*;
+
+ /**
+ * Communication message state.
+ */
+ @SuppressWarnings("PublicField")
+ public class GridTcpCommunicationMessageState {
+ /** */
+ private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** */
+ private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** */
- private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
++ private MessageWriter writer;
+
+ /** */
- private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
-
- /** */
- private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
-
- /** */
- private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
-
- /** */
- private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
-
- /** */
- private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
-
- /** */
- private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
-
- /** */
- private static final byte[] BYTE_ARR_EMPTY = new byte[0];
-
- /** */
- private static final short[] SHORT_ARR_EMPTY = new short[0];
-
- /** */
- private static final int[] INT_ARR_EMPTY = new int[0];
-
- /** */
- private static final long[] LONG_ARR_EMPTY = new long[0];
-
- /** */
- private static final float[] FLOAT_ARR_EMPTY = new float[0];
-
- /** */
- private static final double[] DOUBLE_ARR_EMPTY = new double[0];
-
- /** */
- private static final char[] CHAR_ARR_EMPTY = new char[0];
-
- /** */
- private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
-
- /** */
- private static final byte[] EMPTY_UUID_BYTES = new byte[16];
-
- /** */
- private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
- @Override public byte[] create(int len) {
- switch (len) {
- case -1:
- return BYTE_ARR_NOT_READ;
-
- case 0:
- return BYTE_ARR_EMPTY;
-
- default:
- return new byte[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
- @Override public short[] create(int len) {
- switch (len) {
- case -1:
- return SHORT_ARR_NOT_READ;
-
- case 0:
- return SHORT_ARR_EMPTY;
-
- default:
- return new short[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
- @Override public int[] create(int len) {
- switch (len) {
- case -1:
- return INT_ARR_NOT_READ;
-
- case 0:
- return INT_ARR_EMPTY;
-
- default:
- return new int[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
- @Override public long[] create(int len) {
- switch (len) {
- case -1:
- return LONG_ARR_NOT_READ;
-
- case 0:
- return LONG_ARR_EMPTY;
-
- default:
- return new long[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
- @Override public float[] create(int len) {
- switch (len) {
- case -1:
- return FLOAT_ARR_NOT_READ;
-
- case 0:
- return FLOAT_ARR_EMPTY;
-
- default:
- return new float[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
- @Override public double[] create(int len) {
- switch (len) {
- case -1:
- return DOUBLE_ARR_NOT_READ;
-
- case 0:
- return DOUBLE_ARR_EMPTY;
-
- default:
- return new double[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
- @Override public char[] create(int len) {
- switch (len) {
- case -1:
- return CHAR_ARR_NOT_READ;
-
- case 0:
- return CHAR_ARR_EMPTY;
-
- default:
- return new char[len];
- }
- }
- };
-
- /** */
- private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
- @Override public boolean[] create(int len) {
- switch (len) {
- case -1:
- return BOOLEAN_ARR_NOT_READ;
-
- case 0:
- return BOOLEAN_ARR_EMPTY;
-
- default:
- return new boolean[len];
- }
- }
- };
-
- /** */
- private GridNioMessageWriter msgWriter;
-
- /** */
- private GridNioMessageReader msgReader;
-
- /** */
- private UUID nodeId;
-
- /** */
- private ByteBuffer buf;
-
- /** */
- private byte[] heapArr;
-
- /** */
- private long baseOff;
-
- /** */
- private boolean arrHdrDone;
-
- /** */
- private int arrOff;
-
- /** */
- private Object tmpArr;
-
- /** */
- private int tmpArrOff;
-
- /** */
- private int tmpArrBytes;
-
- /** */
- private boolean msgNotNull;
-
- /** */
- private boolean msgNotNullDone;
-
- /** */
- private boolean msgTypeDone;
-
- /** */
- private GridTcpCommunicationMessageAdapter msg;
++ private MessageReader reader;
+
+ /** */
+ public int idx;
+
+ /** */
+ public boolean typeWritten;
+
+ /** */
+ public Iterator<?> it;
+
+ /** */
+ public Object cur = NULL;
+
+ /** */
+ public boolean keyDone;
+
+ /** */
+ public int readSize = -1;
+
+ /** */
+ public int readItems;
+
+ /**
- * @param msgWriter Message writer.
- * @param nodeId Node ID (provided only if versions are different).
++ * @param writer Writer.
+ */
- public void messageWriter(GridNioMessageWriter msgWriter, @Nullable UUID nodeId) {
- assert msgWriter != null;
-
- this.msgWriter = msgWriter;
- this.nodeId = nodeId;
++ public final void setWriter(MessageWriter writer) {
++ if (this.writer == null)
++ this.writer = writer;
+ }
+
+ /**
- * @param msgReader Message reader.
- * @param nodeId Node ID (provided only if versions are different).
++ * @param reader Reader.
+ */
- public void messageReader(GridNioMessageReader msgReader, @Nullable UUID nodeId) {
- assert msgReader != null;
-
- this.msgReader = msgReader;
- this.nodeId = nodeId;
++ public final void setReader(MessageReader reader) {
++ if (this.reader == null)
++ this.reader = reader;
+ }
+
+ /**
+ * @param buf Buffer.
+ */
+ public final void setBuffer(ByteBuffer buf) {
- assert buf != null;
-
- if (this.buf != buf) {
- this.buf = buf;
++ if (writer != null)
++ writer.setBuffer(buf);
+
- heapArr = buf.isDirect() ? null : buf.array();
- baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
- }
++ if (reader != null)
++ reader.setBuffer(buf);
+ }
+
+ /**
++ * @param name Field name.
+ * @param b Byte value.
+ * @return Whether value was written.
+ */
- public final boolean putByte(byte b) {
- assert buf != null;
-
- if (!buf.hasRemaining())
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putByte(heapArr, baseOff + pos, b);
-
- buf.position(pos + 1);
-
- return true;
++ public final boolean putByte(String name, byte b) {
++ return writer.writeByte(name, b);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Byte value.
+ */
- public final byte getByte() {
- assert buf != null;
- assert buf.hasRemaining();
-
- int pos = buf.position();
-
- buf.position(pos + 1);
-
- return UNSAFE.getByte(heapArr, baseOff + pos);
++ public final byte getByte(String name) {
++ return reader.readByte(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param s Short value.
+ * @return Whether value was written.
+ */
- public final boolean putShort(short s) {
- assert buf != null;
-
- if (buf.remaining() < 2)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putShort(heapArr, baseOff + pos, s);
-
- buf.position(pos + 2);
-
- return true;
++ public final boolean putShort(String name, short s) {
++ return writer.writeShort(name, s);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Short value.
+ */
- public final short getShort() {
- assert buf != null;
- assert buf.remaining() >= 2;
-
- int pos = buf.position();
-
- buf.position(pos + 2);
-
- return UNSAFE.getShort(heapArr, baseOff + pos);
++ public final short getShort(String name) {
++ return reader.readShort(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param i Integer value.
+ * @return Whether value was written.
+ */
- public final boolean putInt(int i) {
- assert buf != null;
-
- if (buf.remaining() < 4)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putInt(heapArr, baseOff + pos, i);
-
- buf.position(pos + 4);
-
- return true;
++ public final boolean putInt(String name, int i) {
++ return writer.writeInt(name, i);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Integer value.
+ */
- public final int getInt() {
- assert buf != null;
- assert buf.remaining() >= 4;
-
- int pos = buf.position();
-
- buf.position(pos + 4);
-
- return UNSAFE.getInt(heapArr, baseOff + pos);
++ public final int getInt(String name) {
++ return reader.readInt(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param l Long value.
+ * @return Whether value was written.
+ */
- public final boolean putLong(long l) {
- assert buf != null;
-
- if (buf.remaining() < 8)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putLong(heapArr, baseOff + pos, l);
-
- buf.position(pos + 8);
-
- return true;
++ public final boolean putLong(String name, long l) {
++ return writer.writeLong(name, l);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Long value.
+ */
- public final long getLong() {
- assert buf != null;
- assert buf.remaining() >= 8;
-
- int pos = buf.position();
-
- buf.position(pos + 8);
-
- return UNSAFE.getLong(heapArr, baseOff + pos);
++ public final long getLong(String name) {
++ return reader.readLong(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param f Float value.
+ * @return Whether value was written.
+ */
- public final boolean putFloat(float f) {
- assert buf != null;
-
- if (buf.remaining() < 4)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putFloat(heapArr, baseOff + pos, f);
-
- buf.position(pos + 4);
-
- return true;
++ public final boolean putFloat(String name, float f) {
++ return writer.writeFloat(name, f);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Float value.
+ */
- public final float getFloat() {
- assert buf != null;
- assert buf.remaining() >= 4;
-
- int pos = buf.position();
-
- buf.position(pos + 4);
-
- return UNSAFE.getFloat(heapArr, baseOff + pos);
++ public final float getFloat(String name) {
++ return reader.readFloat(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param d Double value.
+ * @return Whether value was written.
+ */
- public final boolean putDouble(double d) {
- assert buf != null;
-
- if (buf.remaining() < 8)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putDouble(heapArr, baseOff + pos, d);
-
- buf.position(pos + 8);
-
- return true;
++ public final boolean putDouble(String name, double d) {
++ return writer.writeDouble(name, d);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Double value.
+ */
- public final double getDouble() {
- assert buf != null;
- assert buf.remaining() >= 8;
-
- int pos = buf.position();
-
- buf.position(pos + 8);
-
- return UNSAFE.getDouble(heapArr, baseOff + pos);
++ public final double getDouble(String name) {
++ return reader.readDouble(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param c Char value.
+ * @return Whether value was written.
+ */
- public final boolean putChar(char c) {
- assert buf != null;
-
- if (buf.remaining() < 2)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putChar(heapArr, baseOff + pos, c);
-
- buf.position(pos + 2);
-
- return true;
++ public final boolean putChar(String name, char c) {
++ return writer.writeChar(name, c);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Char value.
+ */
- public final char getChar() {
- assert buf != null;
- assert buf.remaining() >= 2;
-
- int pos = buf.position();
-
- buf.position(pos + 2);
-
- return UNSAFE.getChar(heapArr, baseOff + pos);
++ public final char getChar(String name) {
++ return reader.readChar(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param b Boolean value.
+ * @return Whether value was written.
+ */
- public final boolean putBoolean(boolean b) {
- assert buf != null;
-
- if (buf.remaining() < 1)
- return false;
-
- int pos = buf.position();
-
- UNSAFE.putBoolean(heapArr, baseOff + pos, b);
-
- buf.position(pos + 1);
-
- return true;
++ public final boolean putBoolean(String name, boolean b) {
++ return writer.writeBoolean(name, b);
+ }
+
+ /**
++ * @param name Field name.
+ * @return Boolean value.
+ */
- public final boolean getBoolean() {
- assert buf != null;
- assert buf.hasRemaining();
-
- int pos = buf.position();
-
- buf.position(pos + 1);
-
- return UNSAFE.getBoolean(heapArr, baseOff + pos);
++ public final boolean getBoolean(String name) {
++ return reader.readBoolean(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Byte array.
+ * @return Whether array was fully written.
+ */
- public final boolean putByteArray(@Nullable byte[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, BYTE_ARR_OFF, len, len);
++ public final boolean putByteArray(String name, @Nullable byte[] arr) {
++ return writer.writeByteArray(name, arr);
+ }
+
+ /**
- * @return Byte array or special
- * {@link GridTcpCommunicationMessageAdapter#BYTE_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Byte array.
+ */
- public final byte[] getByteArray() {
- assert buf != null;
-
- return getArray(BYTE_ARR_CREATOR, 0, BYTE_ARR_OFF);
++ public final byte[] getByteArray(String name) {
++ return reader.readByteArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Short array.
+ * @return Whether array was fully written.
+ */
- public final boolean putShortArray(short[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, SHORT_ARR_OFF, len, len << 1);
++ public final boolean putShortArray(String name, short[] arr) {
++ return writer.writeShortArray(name, arr);
+ }
+
+ /**
- * @return Short array or special
- * {@link GridTcpCommunicationMessageAdapter#SHORT_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Short array.
+ */
- public final short[] getShortArray() {
- assert buf != null;
-
- return getArray(SHORT_ARR_CREATOR, 1, SHORT_ARR_OFF);
++ public final short[] getShortArray(String name) {
++ return reader.readShortArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Integer array.
+ * @return Whether array was fully written.
+ */
- public final boolean putIntArray(int[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, INT_ARR_OFF, len, len << 2);
++ public final boolean putIntArray(String name, int[] arr) {
++ return writer.writeIntArray(name, arr);
+ }
+
+ /**
- * @return Integer array or special
- * {@link GridTcpCommunicationMessageAdapter#INT_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Integer array.
+ */
- public final int[] getIntArray() {
- assert buf != null;
-
- return getArray(INT_ARR_CREATOR, 2, INT_ARR_OFF);
++ public final int[] getIntArray(String name) {
++ return reader.readIntArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Long array.
+ * @return Whether array was fully written.
+ */
- public final boolean putLongArray(long[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, LONG_ARR_OFF, len, len << 3);
++ public final boolean putLongArray(String name, long[] arr) {
++ return writer.writeLongArray(name, arr);
+ }
+
+ /**
- * @return Long array or special
- * {@link GridTcpCommunicationMessageAdapter#LONG_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Long array.
+ */
- public final long[] getLongArray() {
- assert buf != null;
-
- return getArray(LONG_ARR_CREATOR, 3, LONG_ARR_OFF);
++ public final long[] getLongArray(String name) {
++ return reader.readLongArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Float array.
+ * @return Whether array was fully written.
+ */
- public final boolean putFloatArray(float[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, FLOAT_ARR_OFF, len, len << 2);
++ public final boolean putFloatArray(String name, float[] arr) {
++ return writer.writeFloatArray(name, arr);
+ }
+
+ /**
- * @return Float array or special
- * {@link GridTcpCommunicationMessageAdapter#FLOAT_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Float array.
+ */
- public final float[] getFloatArray() {
- assert buf != null;
-
- return getArray(FLOAT_ARR_CREATOR, 2, FLOAT_ARR_OFF);
++ public final float[] getFloatArray(String name) {
++ return reader.readFloatArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Double array.
+ * @return Whether array was fully written.
+ */
- public final boolean putDoubleArray(double[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, DOUBLE_ARR_OFF, len, len << 3);
++ public final boolean putDoubleArray(String name, double[] arr) {
++ return writer.writeDoubleArray(name, arr);
+ }
+
+ /**
- * @return Double array or special
- * {@link GridTcpCommunicationMessageAdapter#DOUBLE_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Double array.
+ */
- public final double[] getDoubleArray() {
- assert buf != null;
-
- return getArray(DOUBLE_ARR_CREATOR, 3, DOUBLE_ARR_OFF);
++ public final double[] getDoubleArray(String name) {
++ return reader.readDoubleArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Char array.
+ * @return Whether array was fully written.
+ */
- public final boolean putCharArray(char[] arr) {
- assert buf != null;
-
- int len = arr != null ? arr.length : 0;
-
- return putArray(arr, CHAR_ARR_OFF, len, len << 1);
++ public final boolean putCharArray(String name, char[] arr) {
++ return writer.writeCharArray(name, arr);
+ }
+
+ /**
- * @return Char array or special
- * {@link GridTcpCommunicationMessageAdapter#CHAR_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return Char array.
+ */
- public final char[] getCharArray() {
- assert buf != null;
-
- return getArray(CHAR_ARR_CREATOR, 1, CHAR_ARR_OFF);
++ public final char[] getCharArray(String name) {
++ return reader.readCharArray(name);
+ }
+
+ /**
++ * @param name Field name.
+ * @param arr Boolean array.
+ * @return Whether array was fully written.
+ */
- public final boolean putBooleanArray(boolean[] arr) {
- assert buf != null;
++ public final boolean putBooleanArray(String name, boolean[] arr) {
++ return writer.writeBooleanArray(name, arr);
++ }
++
++ /**
++ * @param name Field name.
++ * @return Boolean array.
++ */
++ public final boolean[] getBooleanArray(String name) {
++ return reader.readBooleanArray(name);
++ }
++
++ /**
++ * @param name Field name.
++ * @param buf Buffer.
++ * @return Whether value was fully written.
++ */
++ public final boolean putByteBuffer(String name, @Nullable ByteBuffer buf) {
++ byte[] arr = null;
++
++ if (buf != null) {
++ ByteBuffer buf0 = buf.duplicate();
+
- int len = arr != null ? arr.length : 0;
++ buf0.flip();
+
- return putArray(arr, BOOLEAN_ARR_OFF, len, len);
++ arr = new byte[buf0.remaining()];
++
++ buf0.get(arr);
++ }
++
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return Boolean array or special
- * {@link GridTcpCommunicationMessageAdapter#BOOLEAN_ARR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link ByteBuffer}.
+ */
- public final boolean[] getBooleanArray() {
- assert buf != null;
++ public final ByteBuffer getByteBuffer(String name) {
++ byte[] arr = getByteArray(name);
+
- return getArray(BOOLEAN_ARR_CREATOR, 0, BOOLEAN_ARR_OFF);
++ if (arr == null)
++ return null;
++ else
++ return ByteBuffer.wrap(arr);
+ }
+
+ /**
++ * @param name Field name.
+ * @param uuid {@link UUID}.
+ * @return Whether value was fully written.
+ */
- public final boolean putUuid(@Nullable UUID uuid) {
++ public final boolean putUuid(String name, @Nullable UUID uuid) {
+ byte[] arr = null;
+
+ if (uuid != null) {
+ arr = new byte[16];
+
+ UNSAFE.putLong(arr, BYTE_ARR_OFF, uuid.getMostSignificantBits());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, uuid.getLeastSignificantBits());
+ }
+
- return putByteArray(arr);
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return {@link UUID} or special
- * {@link GridTcpCommunicationMessageAdapter#UUID_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link UUID}.
+ */
- public final UUID getUuid() {
- byte[] arr = getByteArray();
++ public final UUID getUuid(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return UUID_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else {
+ long most = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+ long least = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+
+ return new UUID(most, least);
+ }
+ }
+
+ /**
- * @param uuid {@link org.apache.ignite.lang.IgniteUuid}.
++ * @param name Field name.
++ * @param uuid {@link IgniteUuid}.
+ * @return Whether value was fully written.
+ */
- public final boolean putGridUuid(@Nullable IgniteUuid uuid) {
++ public final boolean putGridUuid(String name, @Nullable IgniteUuid uuid) {
+ byte[] arr = null;
+
+ if (uuid != null) {
+ arr = new byte[24];
+
+ UNSAFE.putLong(arr, BYTE_ARR_OFF, uuid.globalId().getMostSignificantBits());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, uuid.globalId().getLeastSignificantBits());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 16, uuid.localId());
+ }
+
- return putByteArray(arr);
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return {@link org.apache.ignite.lang.IgniteUuid} or special
- * {@link GridTcpCommunicationMessageAdapter#GRID_UUID_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link IgniteUuid}.
+ */
- public final IgniteUuid getGridUuid() {
- byte[] arr = getByteArray();
++ public final IgniteUuid getGridUuid(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return GRID_UUID_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else {
+ long most = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+ long least = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+ long loc = UNSAFE.getLong(arr, BYTE_ARR_OFF + 16);
+
+ return new IgniteUuid(new UUID(most, least), loc);
+ }
+ }
+
+ /**
++ * @param name Field name.
+ * @param ver {@link GridClockDeltaVersion}.
+ * @return Whether value was fully written.
+ */
- public final boolean putClockDeltaVersion(@Nullable GridClockDeltaVersion ver) {
++ public final boolean putClockDeltaVersion(String name, @Nullable GridClockDeltaVersion ver) {
+ byte[] arr = null;
+
+ if (ver != null) {
+ arr = new byte[16];
+
+ UNSAFE.putLong(arr, BYTE_ARR_OFF, ver.version());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, ver.topologyVersion());
+ }
+
- return putByteArray(arr);
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return {@link GridClockDeltaVersion} or special
- * {@link GridTcpCommunicationMessageAdapter#CLOCK_DELTA_VER_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridClockDeltaVersion}.
+ */
- public final GridClockDeltaVersion getClockDeltaVersion() {
- byte[] arr = getByteArray();
++ public final GridClockDeltaVersion getClockDeltaVersion(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return CLOCK_DELTA_VER_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else {
+ long ver = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+ long topVer = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+
+ return new GridClockDeltaVersion(ver, topVer);
+ }
+ }
+
+ /**
++ * @param name Field name.
+ * @param list {@link GridByteArrayList}.
+ * @return Whether value was fully written.
+ */
- public final boolean putByteArrayList(@Nullable GridByteArrayList list) {
- byte[] arr = list != null ? list.internalArray() : null;
- int size = list != null ? list.size() : 0;
-
- return putArray(arr, BYTE_ARR_OFF, size, size);
++ public final boolean putByteArrayList(String name, @Nullable GridByteArrayList list) {
++ return putByteArray(name, list != null ? list.array() : null);
+ }
+
+ /**
- * @return {@link GridByteArrayList} or special
- * {@link GridTcpCommunicationMessageAdapter#BYTE_ARR_LIST_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridByteArrayList}.
+ */
+ @SuppressWarnings("IfMayBeConditional")
- public final GridByteArrayList getByteArrayList() {
- byte[] arr = getByteArray();
++ public final GridByteArrayList getByteArrayList(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return BYTE_ARR_LIST_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else
+ return new GridByteArrayList(arr);
+ }
+
+ /**
++ * @param name Field name.
+ * @param list {@link GridLongList}.
+ * @return Whether value was fully written.
+ */
- public final boolean putLongList(@Nullable GridLongList list) {
- long[] arr = list != null ? list.internalArray() : null;
- int size = list != null ? list.size() : 0;
-
- return putArray(arr, LONG_ARR_OFF, size, size << 3);
++ public final boolean putLongList(String name, @Nullable GridLongList list) {
++ return putLongArray(name, list != null ? list.array() : null);
+ }
+
+ /**
- * @return {@link GridLongList} or special
- * {@link GridTcpCommunicationMessageAdapter#LONG_LIST_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridLongList}.
+ */
+ @SuppressWarnings("IfMayBeConditional")
- public final GridLongList getLongList() {
- long[] arr = getLongArray();
++ public final GridLongList getLongList(String name) {
++ long[] arr = getLongArray(name);
+
- if (arr == LONG_ARR_NOT_READ)
- return LONG_LIST_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else
+ return new GridLongList(arr);
+ }
+
+ /**
++ * @param name Field name.
+ * @param ver {@link org.apache.ignite.internal.processors.cache.version.GridCacheVersion}.
+ * @return Whether value was fully written.
+ */
- public final boolean putCacheVersion(@Nullable GridCacheVersion ver) {
++ public final boolean putCacheVersion(String name, @Nullable GridCacheVersion ver) {
+ byte[] arr = null;
+
+ if (ver != null) {
+ arr = new byte[24];
+
+ UNSAFE.putInt(arr, BYTE_ARR_OFF, ver.topologyVersion());
+ UNSAFE.putInt(arr, BYTE_ARR_OFF + 4, ver.nodeOrderAndDrIdRaw());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, ver.globalTime());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 16, ver.order());
+ }
+
- return putByteArray(arr);
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return {@link GridCacheVersion} or special
- * {@link GridTcpCommunicationMessageAdapter#CACHE_VER_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridCacheVersion}.
+ */
- public final GridCacheVersion getCacheVersion() {
- byte[] arr = getByteArray();
++ public final GridCacheVersion getCacheVersion(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return CACHE_VER_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else {
+ int topVerDrId = UNSAFE.getInt(arr, BYTE_ARR_OFF);
+ int nodeOrder = UNSAFE.getInt(arr, BYTE_ARR_OFF + 4);
+ long globalTime = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+ long order = UNSAFE.getLong(arr, BYTE_ARR_OFF + 16);
+
+ return new GridCacheVersion(topVerDrId, nodeOrder, globalTime, order);
+ }
+ }
+
+ /**
++ * @param name Field name.
+ * @param id {@link GridDhtPartitionExchangeId}.
+ * @return Whether value was fully written.
+ */
- public final boolean putDhtPartitionExchangeId(@Nullable GridDhtPartitionExchangeId id) {
++ public final boolean putDhtPartitionExchangeId(String name, @Nullable GridDhtPartitionExchangeId id) {
+ byte[] arr = null;
+
+ if (id != null) {
+ arr = new byte[28];
+
+ UNSAFE.putLong(arr, BYTE_ARR_OFF, id.nodeId().getMostSignificantBits());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 8, id.nodeId().getLeastSignificantBits());
+ UNSAFE.putInt(arr, BYTE_ARR_OFF + 16, id.event());
+ UNSAFE.putLong(arr, BYTE_ARR_OFF + 20, id.topologyVersion());
+ }
+
- return putByteArray(arr);
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return {@link GridDhtPartitionExchangeId} or special
- * {@link GridTcpCommunicationMessageAdapter#DHT_PART_EXCHANGE_ID_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridDhtPartitionExchangeId}.
+ */
- public final GridDhtPartitionExchangeId getDhtPartitionExchangeId() {
- byte[] arr = getByteArray();
++ public final GridDhtPartitionExchangeId getDhtPartitionExchangeId(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return DHT_PART_EXCHANGE_ID_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else {
+ long most = UNSAFE.getLong(arr, BYTE_ARR_OFF);
+ long least = UNSAFE.getLong(arr, BYTE_ARR_OFF + 8);
+ int evt = UNSAFE.getInt(arr, BYTE_ARR_OFF + 16);
+ long topVer = UNSAFE.getLong(arr, BYTE_ARR_OFF + 20);
+
+ return new GridDhtPartitionExchangeId(new UUID(most, least), evt, topVer);
+ }
+ }
+
+ /**
++ * @param name Field name.
+ * @param bytes {@link GridCacheValueBytes}.
+ * @return Whether value was fully written.
+ */
- public final boolean putValueBytes(@Nullable GridCacheValueBytes bytes) {
++ public final boolean putValueBytes(String name, @Nullable GridCacheValueBytes bytes) {
+ byte[] arr = null;
+
+ if (bytes != null) {
- if (bytes.get() != null) {
- int len = bytes.get().length;
++ byte[] bytes0 = bytes.get();
++
++ if (bytes0 != null) {
++ int len = bytes0.length;
+
+ arr = new byte[len + 2];
+
+ UNSAFE.putBoolean(arr, BYTE_ARR_OFF, true);
- UNSAFE.copyMemory(bytes.get(), BYTE_ARR_OFF, arr, BYTE_ARR_OFF + 1, len);
++ UNSAFE.copyMemory(bytes0, BYTE_ARR_OFF, arr, BYTE_ARR_OFF + 1, len);
+ UNSAFE.putBoolean(arr, BYTE_ARR_OFF + 1 + len, bytes.isPlain());
+ }
+ else {
+ arr = new byte[1];
+
+ UNSAFE.putBoolean(arr, BYTE_ARR_OFF, false);
+ }
+ }
+
- return putByteArray(arr);
++ return putByteArray(name, arr);
+ }
+
+ /**
- * @return {@link GridCacheValueBytes} or special
- * {@link GridTcpCommunicationMessageAdapter#VAL_BYTES_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridCacheValueBytes}.
+ */
- public final GridCacheValueBytes getValueBytes() {
- byte[] arr = getByteArray();
++ public final GridCacheValueBytes getValueBytes(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return VAL_BYTES_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else {
+ boolean notNull = UNSAFE.getBoolean(arr, BYTE_ARR_OFF);
+
+ if (notNull) {
+ int len = arr.length - 2;
+
+ assert len >= 0 : len;
+
+ byte[] bytesArr = new byte[len];
+
+ UNSAFE.copyMemory(arr, BYTE_ARR_OFF + 1, bytesArr, BYTE_ARR_OFF, len);
+
+ boolean isPlain = UNSAFE.getBoolean(arr, BYTE_ARR_OFF + 1 + len);
+
+ return new GridCacheValueBytes(bytesArr, isPlain);
+ }
+ else
+ return new GridCacheValueBytes();
+ }
+ }
+
+ /**
++ * @param name Field name.
+ * @param str {@link String}.
+ * @return Whether value was fully written.
+ */
- public final boolean putString(@Nullable String str) {
- return putByteArray(str != null ? str.getBytes() : null);
++ public final boolean putString(String name, @Nullable String str) {
++ return putByteArray(name, str != null ? str.getBytes() : null);
+ }
+
+ /**
- * @return {@link String} or special {@link GridTcpCommunicationMessageAdapter#STR_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link String}.
+ */
+ @SuppressWarnings("IfMayBeConditional")
- public final String getString() {
- byte[] arr = getByteArray();
++ public final String getString(String name) {
++ byte[] arr = getByteArray(name);
+
- if (arr == BYTE_ARR_NOT_READ)
- return STR_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else
+ return new String(arr);
+ }
+
+ /**
++ * @param name Field name.
+ * @param bits {@link BitSet}.
+ * @return Whether value was fully written.
+ */
- public final boolean putBitSet(@Nullable BitSet bits) {
- return putLongArray(bits != null ? bits.toLongArray() : null);
++ public final boolean putBitSet(String name, @Nullable BitSet bits) {
++ return putLongArray(name, bits != null ? bits.toLongArray() : null);
+ }
+
+ /**
- * @return {@link BitSet} or special {@link GridTcpCommunicationMessageAdapter#BIT_SET_NOT_READ}
- * value if it was not fully read.
++ * @param name Field name.
++ * @return {@link BitSet}.
+ */
+ @SuppressWarnings("IfMayBeConditional")
- public final BitSet getBitSet() {
- long[] arr = getLongArray();
++ public final BitSet getBitSet(String name) {
++ long[] arr = getLongArray(name);
+
- if (arr == LONG_ARR_NOT_READ)
- return BIT_SET_NOT_READ;
- else if (arr == null)
++ if (arr == null)
+ return null;
+ else
+ return BitSet.valueOf(arr);
+ }
+
+ /**
++ * @param name Field name.
+ * @param e Enum.
+ * @return Whether value was fully written.
+ */
- public final boolean putEnum(@Nullable Enum<?> e) {
- return putByte(e != null ? (byte)e.ordinal() : -1);
++ public final boolean putEnum(String name, @Nullable Enum<?> e) {
++ return putByte(name, e != null ? (byte)e.ordinal() : -1);
+ }
+
+ /**
++ * @param name Field name.
+ * @param msg {@link GridTcpCommunicationMessageAdapter}.
+ * @return Whether value was fully written.
+ */
- public final boolean putMessage(@Nullable GridTcpCommunicationMessageAdapter msg) {
- assert buf != null;
-
- if (!msgNotNullDone) {
- if (!putBoolean(msg != null))
- return false;
-
- msgNotNullDone = true;
- }
-
- if (msg != null) {
- if (!msgWriter.write(nodeId, msg, buf))
- return false;
-
- msgNotNullDone = false;
- }
-
- return true;
- }
-
- /**
- * @return {@link GridTcpCommunicationMessageAdapter} or special
- * {@link GridTcpCommunicationMessageAdapter#MSG_NOT_READ}
- * value if it was not fully read.
- */
- public final GridTcpCommunicationMessageAdapter getMessage() {
- assert buf != null;
-
- if (!msgNotNullDone) {
- if (!buf.hasRemaining())
- return MSG_NOT_READ;
-
- msgNotNull = getBoolean();
-
- msgNotNullDone = true;
- }
-
- if (msgNotNull) {
- if (!msgTypeDone) {
- if (!buf.hasRemaining())
- return MSG_NOT_READ;
-
- GridTcpMessageFactory factory = msgReader.messageFactory();
-
- byte type = getByte();
-
- msg = factory != null ? factory.create(type) : GridTcpCommunicationMessageFactory.create(type);
-
- msgTypeDone = true;
- }
-
- if (msgReader.read(nodeId, msg, buf)) {
- GridTcpCommunicationMessageAdapter msg0 = msg;
-
- msgNotNullDone = false;
- msgTypeDone = false;
- msg = null;
-
- return msg0;
- }
- else
- return MSG_NOT_READ;
- }
- else
- return null;
- }
-
- /**
- * @param arr Array.
- * @param off Offset.
- * @param len Length.
- * @param bytes Length in bytes.
- * @return Whether array was fully written
- */
- private boolean putArray(@Nullable Object arr, long off, int len, int bytes) {
- assert off > 0;
- assert len >= 0;
- assert bytes >= 0;
- assert bytes >= arrOff;
-
- if (!buf.hasRemaining())
- return false;
-
- int pos = buf.position();
-
- if (arr != null) {
- assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
-
- if (!arrHdrDone) {
- if (buf.remaining() < 5)
- return false;
-
- UNSAFE.putBoolean(heapArr, baseOff + pos++, true);
- UNSAFE.putInt(heapArr, baseOff + pos, len);
-
- pos += 4;
-
- buf.position(pos);
-
- arrHdrDone = true;
- }
-
- if (!buf.hasRemaining())
- return false;
-
- int left = bytes - arrOff;
- int remaining = buf.remaining();
-
- if (left <= remaining) {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, left);
-
- pos += left;
-
- buf.position(pos);
-
- arrHdrDone = false;
- arrOff = 0;
- }
- else {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
-
- pos += remaining;
-
- buf.position(pos);
-
- arrOff += remaining;
-
- return false;
- }
- }
- else {
- UNSAFE.putBoolean(heapArr, baseOff + pos++, false);
-
- buf.position(pos);
- }
-
- return true;
- }
-
- /**
- * @param creator Array creator.
- * @param lenShift Array length shift size.
- * @param off Base offset.
- * @return Array or special value if it was not fully read.
- */
- private <T> T getArray(ArrayCreator<T> creator, int lenShift, long off) {
- assert creator != null;
- assert lenShift >= 0;
-
- if (!arrHdrDone) {
- if (!buf.hasRemaining())
- return creator.create(-1);
-
- if (!getBoolean())
- return null;
-
- arrHdrDone = true;
- }
-
- if (tmpArr == null) {
- if (buf.remaining() < 4)
- return creator.create(-1);
-
- int len = getInt();
-
- if (len == 0) {
- arrHdrDone = false;
-
- return creator.create(0);
- }
-
- tmpArr = creator.create(len);
- tmpArrBytes = len << lenShift;
- }
-
- int toRead = tmpArrBytes - tmpArrOff;
- int remaining = buf.remaining();
- int pos = buf.position();
-
- if (remaining < toRead) {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
-
- buf.position(pos + remaining);
-
- tmpArrOff += remaining;
-
- return creator.create(-1);
- }
- else {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
-
- buf.position(pos + toRead);
-
- T arr = (T)tmpArr;
-
- arrHdrDone = false;
- tmpArr = null;
- tmpArrBytes = 0;
- tmpArrOff = 0;
-
- return arr;
- }
- }
-
- /**
- * @param i Integer value.
- * @return Whether value was written.
- */
- public final boolean putIntClient(int i) {
- assert buf != null;
-
- if (buf.remaining() < 4)
- return false;
-
- putByte((byte)(0xFF & (i >>> 24)));
- putByte((byte)(0xFF & (i >>> 16)));
- putByte((byte)(0xFF & (i >>> 8)));
- putByte((byte)(0xFF & i));
-
- return true;
- }
-
- /**
- * @return Integer value.
- */
- public final int getIntClient() {
- assert buf != null;
- assert buf.remaining() >= 4;
-
- int val = 0;
-
- val |= (0xFF & getByte()) << 24;
- val |= (0xFF & getByte()) << 16;
- val |= (0xFF & getByte()) << 8;
- val |= (0xFF & getByte());
-
- return val;
- }
-
- /**
- * @param val Long value.
- * @return Whether value was written.
- */
- public final boolean putLongClient(long val) {
- assert buf != null;
-
- if (buf.remaining() < 8)
- return false;
-
- putByte((byte)(val >>> 56));
- putByte((byte)(0xFFL & (val >>> 48)));
- putByte((byte)(0xFFL & (val >>> 40)));
- putByte((byte)(0xFFL & (val >>> 32)));
- putByte((byte)(0xFFL & (val >>> 24)));
- putByte((byte)(0xFFL & (val >>> 16)));
- putByte((byte)(0xFFL & (val >>> 8)));
- putByte((byte) (0xFFL & val));
-
- return true;
- }
-
- /**
- * @return Long value.
- */
- public final long getLongClient() {
- assert buf != null;
- assert buf.remaining() >= 8;
-
- long x = 0;
-
- x |= (0xFFL & getByte()) << 56;
- x |= (0xFFL & getByte()) << 48;
- x |= (0xFFL & getByte()) << 40;
- x |= (0xFFL & getByte()) << 32;
- x |= (0xFFL & getByte()) << 24;
- x |= (0xFFL & getByte()) << 16;
- x |= (0xFFL & getByte()) << 8;
- x |= (0xFFL & getByte());
-
- return x;
- }
-
- /**
- * @param uuid {@link UUID}.
- * @return Whether value was fully written.
- */
- public final boolean putUuidClient(@Nullable UUID uuid) {
- byte[] arr = uuid != null ? U.uuidToBytes(uuid) : EMPTY_UUID_BYTES;
-
- return putByteArrayClient(arr);
- }
-
- /**
- * @param arr Byte array.
- * @return Whether array was fully written.
- */
- public final boolean putByteArrayClient(byte[] arr) {
- assert buf != null;
- assert arr != null;
-
- return putArrayClient(arr, BYTE_ARR_OFF, arr.length, arr.length);
- }
-
- /**
- * @param src Buffer.
- * @return Whether array was fully written
- */
- public boolean putByteBufferClient(ByteBuffer src) {
- assert src != null;
- assert src.hasArray();
-
- return putArrayClient(src.array(), BYTE_ARR_OFF + src.position(), src.remaining(), src.remaining());
- }
-
- /**
- * @param arr Array.
- * @param off Offset.
- * @param len Length.
- * @param bytes Length in bytes.
- * @return Whether array was fully written
- */
- private boolean putArrayClient(Object arr, long off, int len, int bytes) {
- assert off > 0;
- assert len >= 0;
- assert bytes >= 0;
- assert bytes >= arrOff;
- assert arr != null;
-
- if (!buf.hasRemaining())
- return false;
-
- int pos = buf.position();
-
- assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
-
- if (!arrHdrDone)
- arrHdrDone = true;
-
- if (!buf.hasRemaining())
- return false;
-
- int left = bytes - arrOff;
- int remaining = buf.remaining();
-
- if (left <= remaining) {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, left);
-
- pos += left;
-
- buf.position(pos);
-
- arrHdrDone = false;
- arrOff = 0;
- }
- else {
- UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
-
- pos += remaining;
-
- buf.position(pos);
-
- arrOff += remaining;
-
- return false;
- }
-
- return true;
- }
-
- /**
- * @param len Array length.
- * @return Byte array or special {@link GridTcpCommunicationMessageAdapter#BYTE_ARR_NOT_READ}
- * value if it was not fully read.
- */
- public final byte[] getByteArrayClient(int len) {
- assert buf != null;
-
- return getArrayClient(BYTE_ARR_CREATOR, BYTE_ARR_OFF, len);
- }
-
- /**
- * @return {@link UUID} or special
- * {@link GridTcpCommunicationMessageAdapter#UUID_NOT_READ}
- * value if it was not fully read.
- */
- public final UUID getUuidClient() {
- byte[] arr = getByteArrayClient(16);
++ public final boolean putMessage(String name, @Nullable GridTcpCommunicationMessageAdapter msg) {
++ if (msg != null)
++ msg.setWriter(writer);
+
- assert arr != null;
-
- return arr == BYTE_ARR_NOT_READ ? UUID_NOT_READ : U.bytesToUuid(arr, 0);
++ return writer.writeMessage(name, msg);
+ }
+
+ /**
- * @param creator Array creator.
- * @param off Base offset.
- * @param len Length.
- * @return Array or special value if it was not fully read.
++ * @param name Field name.
++ * @return {@link GridTcpCommunicationMessageAdapter}.
+ */
- private <T> T getArrayClient(ArrayCreator<T> creator, long off, int len) {
- assert creator != null;
-
- if (tmpArr == null) {
- tmpArr = creator.create(len);
- tmpArrBytes = len;
- }
-
- int toRead = tmpArrBytes - tmpArrOff;
- int remaining = buf.remaining();
- int pos = buf.position();
-
- if (remaining < toRead) {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
-
- buf.position(pos + remaining);
-
- tmpArrOff += remaining;
-
- return creator.create(-1);
- }
- else {
- UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
-
- buf.position(pos + toRead);
-
- T arr = (T)tmpArr;
-
- arrHdrDone = false;
- tmpArr = null;
- tmpArrBytes = 0;
- tmpArrOff = 0;
-
- return arr;
- }
++ public final GridTcpCommunicationMessageAdapter getMessage(String name) {
++ return reader.readMessage(name);
+ }
+
- /**
- * Array creator.
- */
- private static interface ArrayCreator<T> {
- /**
- * @param len Array length or {@code -1} if array was not fully read.
- * @return New array.
- */
- public T create(int len);
- }
-
- /**
- * Dummy enum.
- */
- private enum DummyEnum {
- /** */
- DUMMY
++ public final boolean lastRead() {
++ return reader.isLastRead();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
index 0000000,5097db7..d9daba6
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/IpcToNioAdapter.java
@@@ -1,0 -1,250 +1,241 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.util.ipc;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.nio.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+
+ /**
+ * Allows to re-use existing {@link GridNioFilter}s on IPC (specifically shared memory IPC)
+ * communications.
+ *
+ * Note that this class consumes an entire thread inside {@link #serve()} method
+ * in order to serve one {@link IpcEndpoint}.
+ */
+ public class IpcToNioAdapter<T> {
+ /** */
+ private final IpcEndpoint endp;
+
+ /** */
+ private final GridNioFilterChain<T> chain;
+
+ /** */
+ private final GridNioSessionImpl ses;
+
+ /** */
+ private final AtomicReference<CountDownLatch> latchRef = new AtomicReference<>();
+
+ /** */
+ private final ByteBuffer writeBuf;
+
+ /** */
+ private final GridNioMetricsListener metricsLsnr;
+
- /** */
- private final GridNioMessageWriter msgWriter;
-
+ /**
+ * @param metricsLsnr Metrics listener.
+ * @param log Log.
+ * @param endp Endpoint.
- * @param msgWriter Message writer.
+ * @param lsnr Listener.
+ * @param filters Filters.
+ */
+ public IpcToNioAdapter(GridNioMetricsListener metricsLsnr, IgniteLogger log, IpcEndpoint endp,
- GridNioMessageWriter msgWriter, GridNioServerListener<T> lsnr, GridNioFilter... filters) {
++ GridNioServerListener<T> lsnr, GridNioFilter... filters) {
+ assert metricsLsnr != null;
- assert msgWriter != null;
+
+ this.metricsLsnr = metricsLsnr;
+ this.endp = endp;
- this.msgWriter = msgWriter;
+
+ chain = new GridNioFilterChain<>(log, lsnr, new HeadFilter(), filters);
+ ses = new GridNioSessionImpl(chain, null, null, true);
+
+ writeBuf = ByteBuffer.allocate(8 << 10);
+
+ writeBuf.order(ByteOrder.nativeOrder());
+ }
+
+ /**
+ * Serves given set of listeners repeatedly reading data from the endpoint.
+ *
+ * @throws InterruptedException If interrupted.
+ */
+ public void serve() throws InterruptedException {
+ try {
+ chain.onSessionOpened(ses);
+
+ InputStream in = endp.inputStream();
+
+ ByteBuffer readBuf = ByteBuffer.allocate(8 << 10);
+
+ readBuf.order(ByteOrder.nativeOrder());
+
+ assert readBuf.hasArray();
+
+ while (!Thread.interrupted()) {
+ int pos = readBuf.position();
+
+ int read = in.read(readBuf.array(), pos, readBuf.remaining());
+
+ if (read > 0) {
+ metricsLsnr.onBytesReceived(read);
+
+ readBuf.position(0);
+ readBuf.limit(pos + read);
+
+ chain.onMessageReceived(ses, readBuf);
+
+ if (readBuf.hasRemaining())
+ readBuf.compact();
+ else
+ readBuf.clear();
+
+ CountDownLatch latch = latchRef.get();
+
+ if (latch != null)
+ latch.await();
+ }
+ else if (read < 0) {
+ endp.close();
+
+ break; // And close below.
+ }
+ }
+ }
+ catch (Exception e) {
+ chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to read from IPC endpoint.", e));
+ }
+ finally {
+ try {
+ // Assuming remote end closed connection - pushing event from head to tail.
+ chain.onSessionClosed(ses);
+ }
+ catch (IgniteCheckedException e) {
+ chain.onExceptionCaught(ses, new IgniteCheckedException("Failed to process session close event " +
+ "for IPC endpoint.", e));
+ }
+ }
+ }
+
+ /**
+ * Handles write events on chain.
+ *
+ * @param msg Buffer to send.
+ * @return Send result.
+ */
+ private GridNioFuture<?> send(GridTcpCommunicationMessageAdapter msg) {
+ assert writeBuf.hasArray();
+
+ try {
- // This method is called only on handshake,
- // so we don't need to provide node ID for
- // rolling updates support.
- int cnt = msgWriter.writeFully(null, msg, endp.outputStream(), writeBuf);
++ int cnt = U.writeMessageFully(msg, endp.outputStream(), writeBuf);
+
+ metricsLsnr.onBytesSent(cnt);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ return new GridNioFinishedFuture<Object>(e);
+ }
+
+ return new GridNioFinishedFuture<>((Object)null);
+ }
+
+ /**
+ * Filter forwarding messages from chain's head to this server.
+ */
+ private class HeadFilter extends GridNioFilterAdapter {
+ /**
+ * Assigns filter name.
+ */
+ protected HeadFilter() {
+ super("HeadFilter");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
+ proceedSessionOpened(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionClosed(GridNioSession ses) throws IgniteCheckedException {
+ proceedSessionClosed(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onExceptionCaught(GridNioSession ses, IgniteCheckedException ex) throws IgniteCheckedException {
+ proceedExceptionCaught(ses, ex);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) {
+ assert ses == IpcToNioAdapter.this.ses;
+
+ return send((GridTcpCommunicationMessageAdapter)msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessageReceived(GridNioSession ses, Object msg) throws IgniteCheckedException {
+ proceedMessageReceived(ses, msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridNioFuture<?> onPauseReads(GridNioSession ses) throws IgniteCheckedException {
+ // This call should be synced externally to avoid races.
+ boolean b = latchRef.compareAndSet(null, new CountDownLatch(1));
+
+ assert b;
+
+ return new GridNioFinishedFuture<>(b);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridNioFuture<?> onResumeReads(GridNioSession ses) throws IgniteCheckedException {
+ // This call should be synced externally to avoid races.
+ CountDownLatch latch = latchRef.getAndSet(null);
+
+ if (latch != null)
+ latch.countDown();
+
+ return new GridNioFinishedFuture<Object>(latch != null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridNioFuture<Boolean> onSessionClose(GridNioSession ses) {
+ assert ses == IpcToNioAdapter.this.ses;
+
+ boolean closed = IpcToNioAdapter.this.ses.setClosed();
+
+ if (closed)
+ endp.close();
+
+ return new GridNioFinishedFuture<>(closed);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionIdleTimeout(GridNioSession ses) throws IgniteCheckedException {
+ proceedSessionIdleTimeout(ses);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSessionWriteTimeout(GridNioSession ses) throws IgniteCheckedException {
+ proceedSessionWriteTimeout(ses);
+ }
+ }
+ }