You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:26 UTC
[34/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java
index 0000000,1f17b7e..fa917a0
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/fs/GridGgfsSyncMessage.java
@@@ -1,0 -1,161 +1,161 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.fs;
+
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Basic sync message.
+ */
+ public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Coordinator node order. */
+ private long order;
+
+ /** Response flag. */
+ private boolean res;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridGgfsSyncMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param order Node order.
+ * @param res Response flag.
+ */
+ public GridGgfsSyncMessage(long order, boolean res) {
+ this.order = order;
+ this.res = res;
+ }
+
+ /**
+ * @return Coordinator node order.
+ */
+ public long order() {
+ return order;
+ }
+
+ /**
+ * @return {@code True} if response message.
+ */
+ public boolean response() {
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridGgfsSyncMessage.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridGgfsSyncMessage _clone = new GridGgfsSyncMessage();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ super.clone0(_msg);
+
+ GridGgfsSyncMessage _clone = (GridGgfsSyncMessage)_msg;
+
+ _clone.order = order;
+ _clone.res = res;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.writeTo(buf))
+ return false;
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putLong(order))
++ if (!commState.putLong("order", order))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putBoolean(res))
++ if (!commState.putBoolean("res", res))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!super.readFrom(buf))
+ return false;
+
+ switch (commState.idx) {
+ case 0:
- if (buf.remaining() < 8)
- return false;
++ order = commState.getLong("order");
+
- order = commState.getLong();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 1:
- if (buf.remaining() < 1)
- return false;
++ res = commState.getBoolean("res");
+
- res = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 72;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
index 0000000,5d3ad42..d9631bb
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
@@@ -1,0 -1,211 +1,211 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.plugin;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.*;
+ import org.apache.ignite.plugin.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.jetbrains.annotations.*;
+
+ import java.lang.reflect.*;
+ import java.util.*;
+
+ /**
+ * TODO 9447: move to internal package.
+ */
+ public class IgnitePluginProcessor extends GridProcessorAdapter {
+ /** */
+ private final Map<String, PluginProvider> plugins = new LinkedHashMap<>();
+
+ /** */
+ private final Map<PluginProvider, GridPluginContext> pluginCtxMap = new IdentityHashMap<>();
+
+ /** */
+ private volatile Map<Class<?>, Object[]> extensions;
+
+ /**
+ *
+ * @param ctx Kernal context.
+ * @param cfg Ignite configuration.
+ */
+ @SuppressWarnings("TypeMayBeWeakened")
+ public IgnitePluginProcessor(GridKernalContext ctx, IgniteConfiguration cfg) {
+ super(ctx);
+
+ ExtensionRegistry registry = new ExtensionRegistry();
+
+ if (cfg.getPluginConfigurations() != null) {
+ for (PluginConfiguration pluginCfg : cfg.getPluginConfigurations()) {
+ GridPluginContext pluginCtx = new GridPluginContext(ctx, pluginCfg, cfg);
+
+ PluginProvider provider;
+
+ try {
+ if (pluginCfg.providerClass() == null)
+ throw new IgniteException("Provider class is null.");
+
+ try {
- Constructor<? extends PluginProvider> ctr =
++ Constructor<? extends PluginProvider> ctr =
+ pluginCfg.providerClass().getConstructor(PluginContext.class);
+
+ provider = ctr.newInstance(pluginCtx);
+ }
+ catch (NoSuchMethodException ignore) {
+ try {
- Constructor<? extends PluginProvider> ctr =
++ Constructor<? extends PluginProvider> ctr =
+ pluginCfg.providerClass().getConstructor(pluginCfg.getClass());
+
+ provider = ctr.newInstance(pluginCfg);
+ }
+ catch (NoSuchMethodException ignored) {
+ provider = pluginCfg.providerClass().newInstance();
+ }
+ }
+ }
+ catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+ throw new IgniteException("Failed to create plugin provider instance.", e);
+ }
+
+ if (F.isEmpty(provider.name()))
+ throw new IgniteException("Plugin name can not be empty.");
+
+ if (provider.plugin() == null)
+ throw new IgniteException("Plugin is null.");
+
+ if (plugins.containsKey(provider.name()))
+ throw new IgniteException("Duplicated plugin name: " + provider.name());
+
+ plugins.put(provider.name(), provider);
+
+ pluginCtxMap.put(provider, pluginCtx);
+
+ provider.initExtensions(pluginCtx, registry);
+ }
+ }
+
+ extensions = registry.createExtensionMap();
+ }
+
+ /**
+ * @param extensionItf Extension interface class.
+ * @return Returns implementation for provided extension from all plugins.
+ */
+ public <T> T[] extensions(Class<T> extensionItf) {
+ Map<Class<?>, Object[]> extensions = this.extensions;
+
+ T[] res = (T[])extensions.get(extensionItf);
+
+ if (res != null)
+ return res;
+
+ res = (T[])Array.newInstance(extensionItf, 0);
+
+ // Store empty array to map to avoid array creation on the next access.
+ Map<Class<?>, Object[]> extensionsCp = new HashMap<>((extensions.size() + 1) * 2, 2.0f);
+
+ extensionsCp.put(extensionItf, res);
+
+ this.extensions = extensionsCp;
+
+ return res;
+ }
+
+ /**
+ * @param name Plugin name.
+ * @return Plugin provider.
+ */
+ @Nullable public PluginProvider pluginProvider(String name) {
+ return plugins.get(name);
+ }
+
+ /**
+ * @return All plugin providers.
+ */
+ public Collection<PluginProvider> allProviders() {
+ return plugins.values();
+ }
+
+ /**
+ * @param provider Plugin context.
+ * @return Plugin context.
+ */
+ public PluginContext pluginContextForProvider(PluginProvider provider) {
+ return pluginCtxMap.get(provider);
+ }
+
+ /**
+ * @param cls Component class.
+ * @param <T> Component type.
+ * @return Component class instance or {@code null} if no one plugin override this component.
+ */
+ public <T> T createComponent(Class<T> cls) {
+ for (PluginProvider plugin : plugins.values()) {
+ T comp = (T)plugin.createComponent(cls);
+
+ if (comp != null)
+ return comp;
+ }
+
+ return null;
+ }
+
+ /**
+ *
+ */
+ private static class ExtensionRegistry implements IgniteExtensionRegistry {
+ /** */
+ private final Map<Class<?>, List<Object>> extensionsCollector = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public <T> void registerExtension(Class<T> extensionItf, T extensionImpl) {
+ List<Object> list = extensionsCollector.get(extensionItf);
+
+ if (list == null) {
+ list = new ArrayList<>();
+
+ extensionsCollector.put(extensionItf, list);
+ }
+
+ list.add(extensionImpl);
+ }
+
+ /**
+ * @return Map extension interface to array of implementation.
+ */
+ public Map<Class<?>, Object[]> createExtensionMap() {
+ Map<Class<?>, Object[]> extensions = new HashMap<>(extensionsCollector.size() * 2, 0.5f);
+
+ for (Map.Entry<Class<?>, List<Object>> entry : extensionsCollector.entrySet()) {
+ Class<?> extensionItf = entry.getKey();
+
+ List<Object> implementations = entry.getValue();
+
+ Object[] implArr = (Object[])Array.newInstance(extensionItf, implementations.size());
+
+ implArr = implementations.toArray(implArr);
+
+ extensions.put(extensionItf, implArr);
+ }
+
+ return extensions;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableOutputStream.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableOutputStream.java
index 0000000,cfbf8d4..4f23fd1
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/portable/GridPortableOutputStream.java
@@@ -1,0 -1,172 +1,165 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.portable;
+
+ /**
+ * Portable output stream.
+ */
+ public interface GridPortableOutputStream extends GridPortableStream, AutoCloseable {
+ /**
+ * Write byte value.
+ *
+ * @param val Byte value.
+ */
+ public void writeByte(byte val);
+
+ /**
+ * Write byte array.
+ *
+ * @param val Byte array.
+ */
+ public void writeByteArray(byte[] val);
+
+ /**
+ * Write boolean value.
+ *
+ * @param val Boolean value.
+ */
+ public void writeBoolean(boolean val);
+
+ /**
+ * Write boolean array.
+ *
+ * @param val Boolean array.
+ */
+ public void writeBooleanArray(boolean[] val);
+
+ /**
+ * Write short value.
+ *
+ * @param val Short value.
+ */
+ public void writeShort(short val);
+
+ /**
+ * Write short array.
+ *
+ * @param val Short array.
+ */
+ public void writeShortArray(short[] val);
+
+ /**
+ * Write char value.
+ *
+ * @param val Char value.
+ */
+ public void writeChar(char val);
+
+ /**
+ * Write char array.
+ *
+ * @param val Char array.
+ */
+ public void writeCharArray(char[] val);
+
+ /**
+ * Write int value.
+ *
+ * @param val Int value.
+ */
+ public void writeInt(int val);
+
+ /**
+ * Write int value to the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void writeInt(int pos, int val);
+
+ /**
+ * Write int array.
+ *
+ * @param val Int array.
+ */
+ public void writeIntArray(int[] val);
+
+ /**
+ * Write float value.
+ *
+ * @param val Float value.
+ */
+ public void writeFloat(float val);
+
+ /**
+ * Write float array.
+ *
+ * @param val Float array.
+ */
+ public void writeFloatArray(float[] val);
+
+ /**
+ * Write long value.
+ *
+ * @param val Long value.
+ */
+ public void writeLong(long val);
+
+ /**
+ * Write long array.
+ *
+ * @param val Long array.
+ */
+ public void writeLongArray(long[] val);
+
+ /**
+ * Write double value.
+ *
+ * @param val Double value.
+ */
+ public void writeDouble(double val);
+
+ /**
+ * Write double array.
+ *
+ * @param val Double array.
+ */
+ public void writeDoubleArray(double[] val);
+
+ /**
+ * Write byte array.
+ *
+ * @param arr Array.
+ * @param off Offset.
+ * @param len Length.
+ */
+ public void write(byte[] arr, int off, int len);
+
+ /**
+ * Write data from unmanaged memory.
+ *
+ * @param addr Address.
+ * @param cnt Count.
+ */
+ public void write(long addr, int cnt);
+
+ /**
- * Ensure capacity.
- *
- * @param cnt Required byte count.
- */
- public void ensureCapacity(int cnt);
-
- /**
+ * Close the stream releasing resources.
+ */
+ @Override public void close();
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
index 0000000,386b6eb..1c0366a
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
@@@ -1,0 -1,129 +1,127 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.client.message;
+
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.nio.*;
+
+ /**
+ * Client handshake wrapper for direct marshalling.
+ */
+ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = -5705048094821942662L;
+
+ /** Signal char. */
+ public static final byte HANDSHAKE_HEADER = (byte)0x91;
+
+ /** Handshake bytes. */
+ private byte[] bytes;
+
+ /**
+ *
+ */
+ public GridClientHandshakeRequestWrapper() {
+ // No-op.
+ }
+
+ /**
+ *
+ * @param req Handshake request.
+ */
+ public GridClientHandshakeRequestWrapper(GridClientHandshakeRequest req) {
+ bytes = req.rawBytes();
+ }
+
+ /**
+ * @return Handshake bytes.
+ */
+ public byte[] bytes() {
+ return bytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putByteArrayClient(bytes))
++ if (!commState.putByteArray("bytes", bytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- byte[] bytes0 = commState.getByteArrayClient(GridClientHandshakeRequest.PACKET_SIZE);
++ bytes = commState.getByteArray("bytes");
+
- if (bytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- bytes = bytes0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return HANDSHAKE_HEADER;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridClientHandshakeRequestWrapper _clone = new GridClientHandshakeRequestWrapper();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridClientHandshakeRequestWrapper _clone = (GridClientHandshakeRequestWrapper)_msg;
+
+ _clone.bytes = bytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClientHandshakeRequestWrapper.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
index 0000000,1e20a9c..dff1f45
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
@@@ -1,0 -1,96 +1,116 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.client.message;
+
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.nio.*;
+
+ /**
+ * Client handshake wrapper for direct marshalling.
+ */
+ public class GridClientHandshakeResponseWrapper extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = -1529807975073967381L;
+
+ /** */
+ private byte code;
+
+ /**
+ *
+ */
+ public GridClientHandshakeResponseWrapper() {
+ // No-op.
+ }
+
+ /**
+ * @param code Response code.
+ */
+ public GridClientHandshakeResponseWrapper(byte code) {
+ this.code = code;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
++ switch (commState.idx) {
++ case 0:
++ if (!commState.putByte("code", code))
++ return false;
++
++ commState.idx++;
++
++ }
++
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
++ switch (commState.idx) {
++ case 0:
++ code = commState.getByte("code");
++
++ if (!commState.lastRead())
++ return false;
++
++ commState.idx++;
++
++ }
++
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return code;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridClientHandshakeResponseWrapper _clone = new GridClientHandshakeResponseWrapper();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridClientHandshakeResponseWrapper _clone = (GridClientHandshakeResponseWrapper)_msg;
+
+ _clone.code = code;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClientHandshakeResponseWrapper.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java
index 0000000,6ff66ce..6e6f485
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientMessageWrapper.java
@@@ -1,0 -1,266 +1,258 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.client.message;
+
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ * Client message wrapper for direct marshalling.
+ */
+ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 5284375300887454697L;
+
+ /** Client request header. */
+ public static final byte REQ_HEADER = (byte)0x90;
+
+ /** */
+ private int msgSize;
+
+ /** */
+ private long reqId;
+
+ /** */
+ private UUID clientId;
+
+ /** */
+ private UUID destId;
+
+ /** */
+ private ByteBuffer msg;
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @param reqId Request ID.
+ */
+ public void requestId(long reqId) {
+ this.reqId = reqId;
+ }
+
+ /**
+ * @return Message size.
+ */
+ public int messageSize() {
+ return msgSize;
+ }
+
+ /**
+ * @param msgSize Message size.
+ */
+ public void messageSize(int msgSize) {
+ this.msgSize = msgSize;
+ }
+
+ /**
+ * @return Client ID.
+ */
+ public UUID clientId() {
+ return clientId;
+ }
+
+ /**
+ * @param clientId Client ID.
+ */
+ public void clientId(UUID clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
+ * @return Destination ID.
+ */
+ public UUID destinationId() {
+ return destId;
+ }
+
+ /**
+ * @param destId Destination ID.
+ */
+ public void destinationId(UUID destId) {
+ this.destId = destId;
+ }
+
+ /**
+ * @return Message buffer.
+ */
+ public ByteBuffer message() {
+ return msg;
+ }
+
+ /**
+ * @return Message bytes.
+ */
+ public byte[] messageArray() {
+ assert msg.hasArray();
+ assert msg.position() == 0 && msg.remaining() == msg.capacity();
+
+ return msg.array();
+ }
+
+ /**
+ * @param msg Message bytes.
+ */
+ public void message(ByteBuffer msg) {
+ this.msg = msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putIntClient(msgSize))
++ if (!commState.putUuid("clientId", clientId))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putLongClient(reqId))
++ if (!commState.putUuid("destId", destId))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putUuidClient(clientId))
++ if (!commState.putByteBuffer("msg", msg))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putUuidClient(destId))
++ if (!commState.putInt("msgSize", msgSize))
+ return false;
+
+ commState.idx++;
+
+ case 4:
- if (!commState.putByteBufferClient(msg))
++ if (!commState.putLong("reqId", reqId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- if (buf.remaining() < 4)
- return false;
-
- msgSize = commState.getIntClient();
++ clientId = commState.getUuid("clientId");
+
- if (msgSize == 0) // Ping message.
- return true;
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 1:
- if (buf.remaining() < 8)
- return false;
++ destId = commState.getUuid("destId");
+
- reqId = commState.getLongClient();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 2:
- UUID clientId0 = commState.getUuidClient();
++ msg = commState.getByteBuffer("msg");
+
- if (clientId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- clientId = clientId0;
-
+ commState.idx++;
+
+ case 3:
- UUID destId0 = commState.getUuidClient();
++ msgSize = commState.getInt("msgSize");
+
- if (destId0 == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- destId = destId0;
-
+ commState.idx++;
+
+ case 4:
- byte[] msg0 = commState.getByteArrayClient(msgSize - 40);
++ reqId = commState.getLong("reqId");
+
- if (msg0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- msg = ByteBuffer.wrap(msg0);
-
+ commState.idx++;
++
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return REQ_HEADER;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridClientMessageWrapper _clone = new GridClientMessageWrapper();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridClientMessageWrapper _clone = (GridClientMessageWrapper)_msg;
+
- _clone.reqId = reqId;
+ _clone.msgSize = msgSize;
++ _clone.reqId = reqId;
+ _clone.clientId = clientId;
+ _clone.destId = destId;
+ _clone.msg = msg;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClientMessageWrapper.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java
index 0000000,6fd95b5..64e61c9
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/client/message/GridClientPingPacketWrapper.java
@@@ -1,0 -1,89 +1,102 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.client.message;
+
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.nio.*;
+
+ /**
+ * Ping packet wrapper for direct marshalling.
+ */
+ public class GridClientPingPacketWrapper extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = -3956036611004055629L;
+
+ /** Ping message size (always zero). */
+ private int size;
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putIntClient(size))
++ if (!commState.putInt("size", size))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
- throw new UnsupportedOperationException();
++ commState.setBuffer(buf);
++
++ switch (commState.idx) {
++ case 0:
++ size = commState.getInt("size");
++
++ if (!commState.lastRead())
++ return false;
++
++ commState.idx++;
++
++ }
++
++ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return GridClientMessageWrapper.REQ_HEADER;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridClientPingPacketWrapper _clone = new GridClientPingPacketWrapper();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridClientPingPacketWrapper _clone = (GridClientPingPacketWrapper)_msg;
+
+ _clone.size = size;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridClientPingPacketWrapper.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
index 0000000,61d2822..4abd359
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultRequest.java
@@@ -1,0 -1,185 +1,181 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.handlers.task;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Task result request.
+ */
+ public class GridTaskResultRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Task ID. */
+ private IgniteUuid taskId;
+
+ /** Topic. */
+ @GridDirectTransient
+ private Object topic;
+
+ /** Serialized topic. */
+ private byte[] topicBytes;
+
+ /**
+ * Public no-arg constructor for {@link Externalizable} support.
+ */
+ public GridTaskResultRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param taskId Task ID.
+ * @param topic Topic.
+ * @param topicBytes Serialized topic.
+ */
+ GridTaskResultRequest(IgniteUuid taskId, Object topic, byte[] topicBytes) {
+ this.taskId = taskId;
+ this.topic = topic;
+ this.topicBytes = topicBytes;
+ }
+
+ /**
+ * @return Task ID.
+ */
+ public IgniteUuid taskId() {
+ return taskId;
+ }
+
+ /**
+ * @param taskId Task ID.
+ */
+ public void taskId(IgniteUuid taskId) {
+ assert taskId != null;
+
+ this.taskId = taskId;
+ }
+
+ /**
+ * @return Topic.
+ */
+ public Object topic() {
+ return topic;
+ }
+
+ /**
+ * @return Serialized topic.
+ */
+ public byte[] topicBytes() {
+ return topicBytes;
+ }
+
+ /**
+ * @param topic Topic.
+ */
+ public void topic(String topic) {
+ assert topic != null;
+
+ this.topic = topic;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridTaskResultRequest _clone = new GridTaskResultRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridTaskResultRequest _clone = (GridTaskResultRequest)_msg;
+
+ _clone.taskId = taskId;
+ _clone.topic = topic;
+ _clone.topicBytes = topicBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putGridUuid(taskId))
++ if (!commState.putGridUuid("taskId", taskId))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putByteArray(topicBytes))
++ if (!commState.putByteArray("topicBytes", topicBytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- IgniteUuid taskId0 = commState.getGridUuid();
++ taskId = commState.getGridUuid("taskId");
+
- if (taskId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- taskId = taskId0;
-
+ commState.idx++;
+
+ case 1:
- byte[] topicBytes0 = commState.getByteArray();
++ topicBytes = commState.getByteArray("topicBytes");
+
- if (topicBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- topicBytes = topicBytes0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 73;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
index 0000000,fb20156..06cec21
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/handlers/task/GridTaskResultResponse.java
@@@ -1,0 -1,233 +1,229 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.handlers.task;
+
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.jetbrains.annotations.*;
+
+ import java.nio.*;
+
+ /**
+ * Task result response.
+ */
+ public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Result. */
+ @GridDirectTransient
+ private Object res;
+
+ /** Serialized result. */
+ private byte[] resBytes;
+
+ /** Finished flag. */
+ private boolean finished;
+
+ /** Flag indicating that task has ever been launched on node. */
+ private boolean found;
+
+ /** Error. */
+ private String err;
+
+ /**
+ * @return Task result.
+ */
+ @Nullable public Object result() {
+ return res;
+ }
+
+ /**
+ * @param res Task result.
+ */
+ public void result(@Nullable Object res) {
+ this.res = res;
+ }
+
+ /**
+ * @param resBytes Serialized result.
+ */
+ public void resultBytes(byte[] resBytes) {
+ this.resBytes = resBytes;
+ }
+
+ /**
+ * @return Serialized result.
+ */
+ public byte[] resultBytes() {
+ return resBytes;
+ }
+
+ /**
+ * @return {@code true} if finished.
+ */
+ public boolean finished() {
+ return finished;
+ }
+
+ /**
+ * @param finished {@code true} if finished.
+ */
+ public void finished(boolean finished) {
+ this.finished = finished;
+ }
+
+ /**
+ * @return {@code true} if found.
+ */
+ public boolean found() {
+ return found;
+ }
+
+ /**
+ * @param found {@code true} if found.
+ */
+ public void found(boolean found) {
+ this.found = found;
+ }
+
+ /**
+ * @return Error.
+ */
+ public String error() {
+ return err;
+ }
+
+ /**
+ * @param err Error.
+ */
+ public void error(String err) {
+ this.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridTaskResultResponse _clone = new GridTaskResultResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridTaskResultResponse _clone = (GridTaskResultResponse)_msg;
+
+ _clone.res = res;
+ _clone.resBytes = resBytes;
+ _clone.finished = finished;
+ _clone.found = found;
+ _clone.err = err;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putString(err))
++ if (!commState.putString("err", err))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putBoolean(finished))
++ if (!commState.putBoolean("finished", finished))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putBoolean(found))
++ if (!commState.putBoolean("found", found))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putByteArray(resBytes))
++ if (!commState.putByteArray("resBytes", resBytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- String err0 = commState.getString();
++ err = commState.getString("err");
+
- if (err0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- err = err0;
-
+ commState.idx++;
+
+ case 1:
- if (buf.remaining() < 1)
- return false;
++ finished = commState.getBoolean("finished");
+
- finished = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 2:
- if (buf.remaining() < 1)
- return false;
++ found = commState.getBoolean("found");
+
- found = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 3:
- byte[] resBytes0 = commState.getByteArray();
++ resBytes = commState.getByteArray("resBytes");
+
- if (resBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- resBytes = resBytes0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 74;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
index 0000000,55f04aa..3cc4358
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
@@@ -1,0 -1,269 +1,282 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.protocols.tcp;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.nio.charset.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*;
+
+ /**
+ * Memcached message wrapper for direct marshalling.
+ */
+ public class GridMemcachedMessageWrapper extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 3053626103006980626L;
+
+ /** UTF-8 charset. */
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ /**
+ * Memcached message bytes.
+ */
+ private byte[] bytes;
+
+ /**
+ *
+ */
+ public GridMemcachedMessageWrapper() {
+ // No-op.
+ }
+
+ /**
+ * @param msg Message.
+ * @param jdkMarshaller JDK marshaller.
+ * @throws IgniteCheckedException If failed to marshal.
+ */
+ public GridMemcachedMessageWrapper(GridMemcachedMessage msg, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException {
+ bytes = encodeMemcache(msg, jdkMarshaller);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putByteArrayClient(bytes))
++ if (!commState.putByteArray("bytes", bytes))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf) {
- throw new UnsupportedOperationException();
++ commState.setBuffer(buf);
++
++ switch (commState.idx) {
++ case 0:
++ bytes = commState.getByteArray("bytes");
++
++ if (!commState.lastRead())
++ return false;
++
++ commState.idx++;
++
++ }
++
++ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return MEMCACHE_RES_FLAG;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridMemcachedMessageWrapper _clone = new GridMemcachedMessageWrapper();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridMemcachedMessageWrapper _clone = (GridMemcachedMessageWrapper)_msg;
+
+ _clone.bytes = bytes;
+ }
+
+ /**
+ * Encodes memcache message to a raw byte array.
+ *
+ * @param msg Message being serialized.
+ * @param jdkMarshaller JDK marshaller.
+ * @return Serialized message.
+ * @throws IgniteCheckedException If serialization failed.
+ */
+ private byte[] encodeMemcache(GridMemcachedMessage msg, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException {
+ GridByteArrayList res = new GridByteArrayList(HDR_LEN - 1);
+
+ int keyLen = 0;
+
+ int keyFlags = 0;
+
+ if (msg.key() != null) {
+ ByteArrayOutputStream rawKey = new ByteArrayOutputStream();
+
+ keyFlags = encodeObj(msg.key(), rawKey, jdkMarshaller);
+
+ msg.key(rawKey.toByteArray());
+
+ keyLen = rawKey.size();
+ }
+
+ int dataLen = 0;
+
+ int valFlags = 0;
+
+ if (msg.value() != null) {
+ ByteArrayOutputStream rawVal = new ByteArrayOutputStream();
+
+ valFlags = encodeObj(msg.value(), rawVal, jdkMarshaller);
+
+ msg.value(rawVal.toByteArray());
+
+ dataLen = rawVal.size();
+ }
+
+ int flagsLen = 0;
+
+ if (msg.addFlags())
+ flagsLen = FLAGS_LENGTH;
+
+ res.add(msg.operationCode());
+
+ // Cast is required due to packet layout.
+ res.add((short)keyLen);
+
+ // Cast is required due to packet layout.
+ res.add((byte)flagsLen);
+
+ // Data type is always 0x00.
+ res.add((byte)0x00);
+
+ res.add((short)msg.status());
+
+ res.add(keyLen + flagsLen + dataLen);
+
+ res.add(msg.opaque(), 0, msg.opaque().length);
+
+ // CAS, unused.
+ res.add(0L);
+
+ assert res.size() == HDR_LEN - 1;
+
+ if (flagsLen > 0) {
+ res.add((short) keyFlags);
+ res.add((short) valFlags);
+ }
+
+ assert msg.key() == null || msg.key() instanceof byte[];
+ assert msg.value() == null || msg.value() instanceof byte[];
+
+ if (keyLen > 0)
+ res.add((byte[])msg.key(), 0, ((byte[])msg.key()).length);
+
+ if (dataLen > 0)
+ res.add((byte[])msg.value(), 0, ((byte[])msg.value()).length);
+
+ return res.entireArray();
+ }
+
+ /**
+ * Encodes given object to a byte array and returns flags that describe the type of serialized object.
+ *
+ * @param obj Object to serialize.
+ * @param out Output stream to which object should be written.
+ * @param jdkMarshaller JDK marshaller.
+ * @return Serialization flags.
+ * @throws IgniteCheckedException If JDK serialization failed.
+ */
+ private int encodeObj(Object obj, ByteArrayOutputStream out, IgniteMarshaller jdkMarshaller) throws IgniteCheckedException {
+ int flags = 0;
+
+ byte[] data = null;
+
+ if (obj instanceof String)
+ data = ((String)obj).getBytes(UTF_8);
+ else if (obj instanceof Boolean) {
+ data = new byte[] {(byte)((Boolean)obj ? '1' : '0')};
+
+ flags |= BOOLEAN_FLAG;
+ }
+ else if (obj instanceof Integer) {
+ data = U.intToBytes((Integer) obj);
+
+ flags |= INT_FLAG;
+ }
+ else if (obj instanceof Long) {
+ data = U.longToBytes((Long)obj);
+
+ flags |= LONG_FLAG;
+ }
+ else if (obj instanceof Date) {
+ data = U.longToBytes(((Date)obj).getTime());
+
+ flags |= DATE_FLAG;
+ }
+ else if (obj instanceof Byte) {
+ data = new byte[] {(Byte)obj};
+
+ flags |= BYTE_FLAG;
+ }
+ else if (obj instanceof Float) {
+ data = U.intToBytes(Float.floatToIntBits((Float)obj));
+
+ flags |= FLOAT_FLAG;
+ }
+ else if (obj instanceof Double) {
+ data = U.longToBytes(Double.doubleToLongBits((Double)obj));
+
+ flags |= DOUBLE_FLAG;
+ }
+ else if (obj instanceof byte[]) {
+ data = (byte[])obj;
+
+ flags |= BYTE_ARR_FLAG;
+ }
+ else {
+ jdkMarshaller.marshal(obj, out);
+
+ flags |= SERIALIZED_FLAG;
+ }
+
+ if (data != null)
+ out.write(data, 0, data.length);
+
+ return flags;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridMemcachedMessageWrapper.class, this);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
index 0000000,55d7c95..7a95fb2
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
@@@ -1,0 -1,519 +1,515 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.protocols.tcp;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.client.marshaller.*;
+ import org.apache.ignite.internal.processors.rest.client.message.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.nio.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.nio.*;
+ import java.nio.charset.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.processors.rest.protocols.tcp.GridMemcachedMessage.*;
+ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
+
+ /**
+ *
+ */
+ public class GridTcpRestDirectParser implements GridNioParser {
+ /** UTF-8 charset. */
+ private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+ /** Message metadata key. */
+ private static final int MSG_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Protocol handler. */
+ private final GridTcpRestProtocol proto;
+
- /** Message reader. */
- private final GridNioMessageReader msgReader;
-
+ /**
+ * @param proto Protocol handler.
- * @param msgReader Message reader.
+ */
- public GridTcpRestDirectParser(GridTcpRestProtocol proto, GridNioMessageReader msgReader) {
++ public GridTcpRestDirectParser(GridTcpRestProtocol proto) {
+ this.proto = proto;
- this.msgReader = msgReader;
+ }
+
+ /** {@inheritDoc} */
- @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
++ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf)
++ throws IOException, IgniteCheckedException {
+ ParserState state = ses.removeMeta(PARSER_STATE.ordinal());
+
+ if (state != null) {
+ assert state.packetType() == GridClientPacketType.MEMCACHE;
+
+ Object memcacheMsg = parseMemcachePacket(ses, buf, state);
+
+ if (memcacheMsg == null)
+ ses.addMeta(PARSER_STATE.ordinal(), state);
+
+ return memcacheMsg;
+ }
+
+ GridTcpCommunicationMessageAdapter msg = ses.removeMeta(MSG_META_KEY);
+
+ if (msg == null && buf.hasRemaining()) {
+ byte type = buf.get(buf.position());
+
+ if (type == GridClientMessageWrapper.REQ_HEADER) {
+ buf.get();
+
+ msg = new GridClientMessageWrapper();
+ }
+ else if (type == GridClientHandshakeRequestWrapper.HANDSHAKE_HEADER) {
+ buf.get();
+
+ msg = new GridClientHandshakeRequestWrapper();
+ }
+ else if (type == MEMCACHE_REQ_FLAG) {
+ state = new ParserState();
+
+ state.packet(new GridMemcachedMessage());
+ state.packetType(GridClientPacketType.MEMCACHE);
+
+ Object memcacheMsg = parseMemcachePacket(ses, buf, state);
+
+ if (memcacheMsg == null)
+ ses.addMeta(PARSER_STATE.ordinal(), state);
+
+ return memcacheMsg;
+ }
+ else
+ throw new IOException("Invalid message type: " + type);
+ }
+
+ boolean finished = false;
+
+ if (buf.hasRemaining())
- finished = msgReader.read(null, msg, buf);
++ finished = msg.readFrom(buf);
+
+ if (finished) {
+ if (msg instanceof GridClientMessageWrapper) {
+ GridClientMessageWrapper clientMsg = (GridClientMessageWrapper)msg;
+
+ if (clientMsg.messageSize() == 0)
+ return GridClientPingPacket.PING_MESSAGE;
+
+ GridClientMarshaller marsh = proto.marshaller(ses);
+
+ GridClientMessage ret = marsh.unmarshal(clientMsg.messageArray());
+
+ ret.requestId(clientMsg.requestId());
+ ret.clientId(clientMsg.clientId());
+ ret.destinationId(clientMsg.destinationId());
+
+ return ret;
+ }
+ else {
+ assert msg instanceof GridClientHandshakeRequestWrapper;
+
+ GridClientHandshakeRequestWrapper req = (GridClientHandshakeRequestWrapper)msg;
+
+ GridClientHandshakeRequest ret = new GridClientHandshakeRequest();
+
+ ret.putBytes(req.bytes(), 0, 4);
+
+ return ret;
+ }
+ }
+ else {
+ ses.addMeta(MSG_META_KEY, msg);
+
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ // No encoding needed for direct messages.
+ throw new UnsupportedEncodingException();
+ }
+
+ /**
+ * Parses memcache protocol message.
+ *
+ * @param ses Session.
+ * @param buf Buffer containing not parsed bytes.
+ * @param state Current parser state.
+ * @return Parsed packet.s
+ * @throws IOException If packet cannot be parsed.
+ * @throws IgniteCheckedException If deserialization error occurred.
+ */
+ @Nullable private GridClientMessage parseMemcachePacket(GridNioSession ses, ByteBuffer buf, ParserState state)
+ throws IOException, IgniteCheckedException {
+ assert state.packetType() == GridClientPacketType.MEMCACHE;
+ assert state.packet() != null;
+ assert state.packet() instanceof GridMemcachedMessage;
+
+ GridMemcachedMessage req = (GridMemcachedMessage)state.packet();
+ ByteArrayOutputStream tmp = state.buffer();
+ int i = state.index();
+
+ while (buf.remaining() > 0) {
+ byte b = buf.get();
+
+ if (i == 0)
+ req.requestFlag(b);
+ else if (i == 1)
+ req.operationCode(b);
+ else if (i == 2 || i == 3) {
+ tmp.write(b);
+
+ if (i == 3) {
+ req.keyLength(U.bytesToShort(tmp.toByteArray(), 0));
+
+ tmp.reset();
+ }
+ }
+ else if (i == 4)
+ req.extrasLength(b);
+ else if (i >= 8 && i <= 11) {
+ tmp.write(b);
+
+ if (i == 11) {
+ req.totalLength(U.bytesToInt(tmp.toByteArray(), 0));
+
+ tmp.reset();
+ }
+ }
+ else if (i >= 12 && i <= 15) {
+ tmp.write(b);
+
+ if (i == 15) {
+ req.opaque(tmp.toByteArray());
+
+ tmp.reset();
+ }
+ }
+ else if (i >= HDR_LEN && i < HDR_LEN + req.extrasLength()) {
+ tmp.write(b);
+
+ if (i == HDR_LEN + req.extrasLength() - 1) {
+ req.extras(tmp.toByteArray());
+
+ tmp.reset();
+ }
+ }
+ else if (i >= HDR_LEN + req.extrasLength() &&
+ i < HDR_LEN + req.extrasLength() + req.keyLength()) {
+ tmp.write(b);
+
+ if (i == HDR_LEN + req.extrasLength() + req.keyLength() - 1) {
+ req.key(tmp.toByteArray());
+
+ tmp.reset();
+ }
+ }
+ else if (i >= HDR_LEN + req.extrasLength() + req.keyLength() &&
+ i < HDR_LEN + req.totalLength()) {
+ tmp.write(b);
+
+ if (i == HDR_LEN + req.totalLength() - 1) {
+ req.value(tmp.toByteArray());
+
+ tmp.reset();
+ }
+ }
+
+ if (i == HDR_LEN + req.totalLength() - 1)
+ // Assembled the packet.
+ return assemble(ses, req);
+
+ i++;
+ }
+
+ state.index(i);
+
+ return null;
+ }
+
+ /**
+ * Validates incoming packet and deserializes all fields that need to be deserialized.
+ *
+ * @param ses Session on which packet is being parsed.
+ * @param req Raw packet.
+ * @return Same packet with fields deserialized.
+ * @throws IOException If parsing failed.
+ * @throws IgniteCheckedException If deserialization failed.
+ */
+ private GridClientMessage assemble(GridNioSession ses, GridMemcachedMessage req) throws IOException, IgniteCheckedException {
+ byte[] extras = req.extras();
+
+ // First, decode key and value, if any
+ if (req.key() != null || req.value() != null) {
+ short keyFlags = 0;
+ short valFlags = 0;
+
+ if (req.hasFlags()) {
+ if (extras == null || extras.length < FLAGS_LENGTH)
+ throw new IOException("Failed to parse incoming packet (flags required for command) [ses=" +
+ ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']');
+
+ keyFlags = U.bytesToShort(extras, 0);
+ valFlags = U.bytesToShort(extras, 2);
+ }
+
+ if (req.key() != null) {
+ assert req.key() instanceof byte[];
+
+ byte[] rawKey = (byte[])req.key();
+
+ // Only values can be hessian-encoded.
+ req.key(decodeObj(keyFlags, rawKey));
+ }
+
+ if (req.value() != null) {
+ assert req.value() instanceof byte[];
+
+ byte[] rawVal = (byte[])req.value();
+
+ req.value(decodeObj(valFlags, rawVal));
+ }
+ }
+
+ if (req.hasExpiration()) {
+ if (extras == null || extras.length < 8)
+ throw new IOException("Failed to parse incoming packet (expiration value required for command) [ses=" +
+ ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']');
+
+ req.expiration(U.bytesToInt(extras, 4) & 0xFFFFFFFFL);
+ }
+
+ if (req.hasInitial()) {
+ if (extras == null || extras.length < 16)
+ throw new IOException("Failed to parse incoming packet (initial value required for command) [ses=" +
+ ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']');
+
+ req.initial(U.bytesToLong(extras, 8));
+ }
+
+ if (req.hasDelta()) {
+ if (extras == null || extras.length < 8)
+ throw new IOException("Failed to parse incoming packet (delta value required for command) [ses=" +
+ ses + ", opCode=" + Integer.toHexString(req.operationCode() & 0xFF) + ']');
+
+ req.delta(U.bytesToLong(extras, 0));
+ }
+
+ if (extras != null) {
+ // Clients that include cache name must always include flags.
+ int len = 4;
+
+ if (req.hasExpiration())
+ len += 4;
+
+ if (req.hasDelta())
+ len += 8;
+
+ if (req.hasInitial())
+ len += 8;
+
+ if (extras.length - len > 0) {
+ byte[] cacheName = new byte[extras.length - len];
+
+ U.arrayCopy(extras, len, cacheName, 0, extras.length - len);
+
+ req.cacheName(new String(cacheName, UTF_8));
+ }
+ }
+
+ return req;
+ }
+
+ /**
+ * Decodes value from a given byte array to the object according to the flags given.
+ *
+ * @param flags Flags.
+ * @param bytes Byte array to decode.
+ * @return Decoded value.
+ * @throws IgniteCheckedException If deserialization failed.
+ */
+ private Object decodeObj(short flags, byte[] bytes) throws IgniteCheckedException {
+ assert bytes != null;
+
+ if ((flags & SERIALIZED_FLAG) != 0)
+ return proto.jdkMarshaller().unmarshal(bytes, null);
+
+ int masked = flags & 0xff00;
+
+ switch (masked) {
+ case BOOLEAN_FLAG:
+ return bytes[0] == '1';
+ case INT_FLAG:
+ return U.bytesToInt(bytes, 0);
+ case LONG_FLAG:
+ return U.bytesToLong(bytes, 0);
+ case DATE_FLAG:
+ return new Date(U.bytesToLong(bytes, 0));
+ case BYTE_FLAG:
+ return bytes[0];
+ case FLOAT_FLAG:
+ return Float.intBitsToFloat(U.bytesToInt(bytes, 0));
+ case DOUBLE_FLAG:
+ return Double.longBitsToDouble(U.bytesToLong(bytes, 0));
+ case BYTE_ARR_FLAG:
+ return bytes;
+ default:
+ return new String(bytes, UTF_8);
+ }
+ }
+
+ /**
+ * Holder for parser state and temporary buffer.
+ */
+ protected static class ParserState {
+ /** Parser index. */
+ private int idx;
+
+ /** Temporary data buffer. */
+ private ByteArrayOutputStream buf = new ByteArrayOutputStream();
+
+ /** Packet being assembled. */
+ private GridClientMessage packet;
+
+ /** Packet type. */
+ private GridClientPacketType packetType;
+
+ /** Header data. */
+ private HeaderData hdr;
+
+ /**
+ * @return Stored parser index.
+ */
+ public int index() {
+ return idx;
+ }
+
+ /**
+ * @param idx Index to store.
+ */
+ public void index(int idx) {
+ this.idx = idx;
+ }
+
+ /**
+ * @return Temporary data buffer.
+ */
+ public ByteArrayOutputStream buffer() {
+ return buf;
+ }
+
+ /**
+ * @return Pending packet.
+ */
+ @Nullable public GridClientMessage packet() {
+ return packet;
+ }
+
+ /**
+ * @param packet Pending packet.
+ */
+ public void packet(GridClientMessage packet) {
+ assert this.packet == null;
+
+ this.packet = packet;
+ }
+
+ /**
+ * @return Pending packet type.
+ */
+ public GridClientPacketType packetType() {
+ return packetType;
+ }
+
+ /**
+ * @param packetType Pending packet type.
+ */
+ public void packetType(GridClientPacketType packetType) {
+ this.packetType = packetType;
+ }
+
+ /**
+ * @return Header.
+ */
+ public HeaderData header() {
+ return hdr;
+ }
+
+ /**
+ * @param hdr Header.
+ */
+ public void header(HeaderData hdr) {
+ this.hdr = hdr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ParserState.class, this);
+ }
+ }
+
+ /**
+ * Header.
+ */
+ protected static class HeaderData {
+ /** Request Id. */
+ private final long reqId;
+
+ /** Request Id. */
+ private final UUID clientId;
+
+ /** Request Id. */
+ private final UUID destId;
+
+ /**
+ * @param reqId Request Id.
+ * @param clientId Client Id.
+ * @param destId Destination Id.
+ */
+ private HeaderData(long reqId, UUID clientId, UUID destId) {
+ this.reqId = reqId;
+ this.clientId = clientId;
+ this.destId = destId;
+ }
+
+ /**
+ * @return Request Id.
+ */
+ public long reqId() {
+ return reqId;
+ }
+
+ /**
+ * @return Client Id.
+ */
+ public UUID clientId() {
+ return clientId;
+ }
+
+ /**
+ * @return Destination Id.
+ */
+ public UUID destinationId() {
+ return destId;
+ }
+ }
+ }