You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2015/02/02 04:28:25 UTC
[33/52] [abbrv] incubator-ignite git commit: Merge branch 'sprint-1'
of https://git-wip-us.apache.org/repos/asf/incubator-ignite into ignite-61
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index 0000000,a4f6488..71ed911
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@@ -1,0 -1,335 +1,280 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.rest.protocols.tcp;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.marshaller.jdk.*;
+ import org.apache.ignite.spi.*;
+ import org.apache.ignite.client.marshaller.*;
+ import org.apache.ignite.client.marshaller.jdk.*;
+ import org.apache.ignite.client.marshaller.optimized.*;
+ import org.apache.ignite.client.ssl.*;
+ import org.apache.ignite.internal.processors.rest.*;
+ import org.apache.ignite.internal.processors.rest.client.message.*;
+ import org.apache.ignite.internal.processors.rest.protocols.*;
-import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.nio.*;
+ import org.apache.ignite.internal.util.nio.ssl.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import javax.net.ssl.*;
+ import java.io.*;
+ import java.net.*;
+ import java.nio.*;
+ import java.util.*;
+
+ import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.*;
+
+ /**
+ * TCP binary protocol implementation.
+ */
+ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
+ /** Server. */
+ private GridNioServer<GridClientMessage> srv;
+
+ /** JDK marshaller. */
+ private final IgniteMarshaller jdkMarshaller = new IgniteJdkMarshaller();
+
+ /** NIO server listener. */
+ private GridTcpRestNioListener lsnr;
+
- /** Message reader. */
- private final GridNioMessageReader msgReader = new GridNioMessageReader() {
- @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
- assert msg != null;
- assert buf != null;
-
- msg.messageReader(this, nodeId);
-
- return msg.readFrom(buf);
- }
-
- @Nullable @Override public GridTcpMessageFactory messageFactory() {
- return null;
- }
- };
-
- /** Message writer. */
- private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() {
- @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
- assert msg != null;
- assert buf != null;
-
- msg.messageWriter(this, nodeId);
-
- return msg.writeTo(buf);
- }
-
- @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out,
- ByteBuffer buf) throws IOException {
- assert msg != null;
- assert out != null;
- assert buf != null;
- assert buf.hasArray();
-
- msg.messageWriter(this, nodeId);
-
- boolean finished = false;
- int cnt = 0;
-
- while (!finished) {
- finished = msg.writeTo(buf);
-
- out.write(buf.array(), 0, buf.position());
-
- cnt += buf.position();
-
- buf.clear();
- }
-
- return cnt;
- }
- };
-
+ /** @param ctx Context. */
+ public GridTcpRestProtocol(GridKernalContext ctx) {
+ super(ctx);
+ }
+
+ /**
+ * @return JDK marshaller.
+ */
+ IgniteMarshaller jdkMarshaller() {
+ return jdkMarshaller;
+ }
+
+ /**
+ * Returns marshaller.
+ *
+ * @param ses Session.
+ * @return Marshaller.
+ */
+ GridClientMarshaller marshaller(GridNioSession ses) {
+ GridClientMarshaller marsh = ses.meta(MARSHALLER.ordinal());
+
+ assert marsh != null;
+
+ return marsh;
+ }
+
+ /**
+ * @param ses Session.
+ * @return Whether portable marshaller is used.
+ */
+ boolean portableMode(GridNioSession ses) {
+ return ctx.portable().isPortable(marshaller(ses));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "TCP binary";
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("BusyWait")
+ @Override public void start(final GridRestProtocolHandler hnd) throws IgniteCheckedException {
+ assert hnd != null;
+
+ ClientConnectionConfiguration cfg = ctx.config().getClientConnectionConfiguration();
+
+ assert cfg != null;
+
+ lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);
+
- GridNioParser parser = new GridTcpRestDirectParser(this, msgReader);
++ GridNioParser parser = new GridTcpRestDirectParser(this);
+
+ try {
+ host = resolveRestTcpHost(ctx.config());
+
+ SSLContext sslCtx = null;
+
+ if (cfg.isRestTcpSslEnabled()) {
+ GridSslContextFactory factory = cfg.getRestTcpSslContextFactory();
+
+ if (factory == null)
+ // Thrown SSL exception instead of IgniteCheckedException for writing correct warning message into log.
+ throw new SSLException("SSL is enabled, but SSL context factory is not specified.");
+
+ sslCtx = factory.createSslContext();
+ }
+
+ int lastPort = cfg.getRestTcpPort() + cfg.getRestPortRange() - 1;
+
+ for (int port0 = cfg.getRestTcpPort(); port0 <= lastPort; port0++) {
+ if (startTcpServer(host, port0, lsnr, parser, sslCtx, cfg)) {
+ port = port0;
+
+ if (log.isInfoEnabled())
+ log.info(startInfo());
+
+ return;
+ }
+ }
+
+ U.warn(log, "Failed to start TCP binary REST server (possibly all ports in range are in use) " +
+ "[firstPort=" + cfg.getRestTcpPort() + ", lastPort=" + lastPort + ", host=" + host + ']');
+ }
+ catch (SSLException e) {
+ U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),
+ "Failed to start " + name() + " protocol on port " + port + ". Check if SSL context factory is " +
+ "properly configured.");
+ }
+ catch (IOException e) {
+ U.warn(log, "Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage(),
+ "Failed to start " + name() + " protocol on port " + port + ". " +
+ "Check restTcpHost configuration property.");
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() {
+ super.onKernalStart();
+
+ Map<Byte, GridClientMarshaller> marshMap = new HashMap<>();
+
+ marshMap.put(GridClientOptimizedMarshaller.ID, new GridClientOptimizedMarshaller());
+ marshMap.put(GridClientJdkMarshaller.ID, new GridClientJdkMarshaller());
+ marshMap.put((byte)0, ctx.portable().portableMarshaller());
+
+ lsnr.marshallers(marshMap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ if (srv != null) {
+ ctx.ports().deregisterPorts(getClass());
+
+ srv.stop();
+ }
+
+ if (log.isInfoEnabled())
+ log.info(stopInfo());
+ }
+
+ /**
+ * Resolves host for REST TCP server using grid configuration.
+ *
+ * @param cfg Grid configuration.
+ * @return REST host.
+ * @throws IOException If failed to resolve REST host.
+ */
+ private InetAddress resolveRestTcpHost(IgniteConfiguration cfg) throws IOException {
+ String host = cfg.getClientConnectionConfiguration().getRestTcpHost();
+
+ if (host == null)
+ host = cfg.getLocalHost();
+
+ return U.resolveLocalHost(host);
+ }
+
+ /**
+ * Tries to start server with given parameters.
+ *
+ * @param hostAddr Host on which server should be bound.
+ * @param port Port on which server should be bound.
+ * @param lsnr Server message listener.
+ * @param parser Server message parser.
+ * @param sslCtx SSL context in case if SSL is enabled.
+ * @param cfg Configuration for other parameters.
+ * @return {@code True} if server successfully started, {@code false} if port is used and
+ * server was unable to start.
+ */
+ private boolean startTcpServer(InetAddress hostAddr, int port, GridNioServerListener<GridClientMessage> lsnr,
+ GridNioParser parser, @Nullable SSLContext sslCtx, ClientConnectionConfiguration cfg) {
+ try {
+ GridNioFilter codec = new GridNioCodecFilter(parser, log, true);
+
+ GridNioFilter[] filters;
+
+ if (sslCtx != null) {
+ GridNioSslFilter sslFilter = new GridNioSslFilter(sslCtx, log);
+
+ sslFilter.directMode(true);
+
+ boolean auth = cfg.isRestTcpSslClientAuth();
+
+ sslFilter.wantClientAuth(auth);
+
+ sslFilter.needClientAuth(auth);
+
+ filters = new GridNioFilter[] {
+ codec,
+ sslFilter
+ };
+ }
+ else
+ filters = new GridNioFilter[] { codec };
+
+ srv = GridNioServer.<GridClientMessage>builder()
+ .address(hostAddr)
+ .port(port)
+ .listener(lsnr)
+ .logger(log)
+ .selectorCount(cfg.getRestTcpSelectorCount())
+ .gridName(ctx.gridName())
+ .tcpNoDelay(cfg.isRestTcpNoDelay())
+ .directBuffer(cfg.isRestTcpDirectBuffer())
+ .byteOrder(ByteOrder.nativeOrder())
+ .socketSendBufferSize(cfg.getRestTcpSendBufferSize())
+ .socketReceiveBufferSize(cfg.getRestTcpReceiveBufferSize())
+ .sendQueueLimit(cfg.getRestTcpSendQueueLimit())
+ .filters(filters)
+ .directMode(true)
- .messageWriter(msgWriter)
+ .build();
+
+ srv.idleTimeout(cfg.getRestIdleTimeout());
+
+ srv.start();
+
+ ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
+
+ return true;
+ }
+ catch (IgniteCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to start " + name() + " protocol on port " + port + ": " + e.getMessage());
+
+ return false;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String getAddressPropertyName() {
+ return GridNodeAttributes.ATTR_REST_TCP_ADDRS;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String getHostNamePropertyName() {
+ return GridNodeAttributes.ATTR_REST_TCP_HOST_NAMES;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String getPortPropertyName() {
+ return GridNodeAttributes.ATTR_REST_TCP_PORT;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
index 0000000,0afb745..efc80cb
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerCancelRequest.java
@@@ -1,0 -1,123 +1,121 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.streamer;
+
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+
+ import java.io.*;
+ import java.nio.*;
+
+ /**
+ * Streamer cancel request.
+ */
+ public class GridStreamerCancelRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Cancelled future ID. */
+ private IgniteUuid cancelledFutId;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public GridStreamerCancelRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param cancelledFutId Cancelled future ID.
+ */
+ public GridStreamerCancelRequest(IgniteUuid cancelledFutId) {
+ this.cancelledFutId = cancelledFutId;
+ }
+
+ /**
+ * @return Cancelled future ID.
+ */
+ public IgniteUuid cancelledFutureId() {
+ return cancelledFutId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridStreamerCancelRequest _clone = new GridStreamerCancelRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridStreamerCancelRequest _clone = (GridStreamerCancelRequest)_msg;
+
+ _clone.cancelledFutId = cancelledFutId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putGridUuid(cancelledFutId))
++ if (!commState.putGridUuid("cancelledFutId", cancelledFutId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- IgniteUuid cancelledFutId0 = commState.getGridUuid();
++ cancelledFutId = commState.getGridUuid("cancelledFutId");
+
- if (cancelledFutId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- cancelledFutId = cancelledFutId0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 75;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
index 0000000,5e9f937..e62a003
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerExecutionRequest.java
@@@ -1,0 -1,382 +1,374 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.streamer;
+
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.tostring.*;
+ import org.jetbrains.annotations.*;
+
+ import java.nio.*;
+ import java.util.*;
+
+ /**
+ *
+ */
+ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Force local deployment flag. */
+ private boolean forceLocDep;
+
+ /** Serialized batch in case if P2P class loading is enabled. */
+ @GridToStringExclude
+ private byte[] batchBytes;
+
+ /** Deployment mode. */
+ private IgniteDeploymentMode depMode;
+
+ /** Deployment sample class name. */
+ private String sampleClsName;
+
+ /** Deployment user version. */
+ private String userVer;
+
+ /** Node class loader participants. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+ private Map<UUID, IgniteUuid> ldrParticipants;
+
+ /** Class loader ID. */
+ private IgniteUuid clsLdrId;
+
+ /**
+ *
+ */
+ public GridStreamerExecutionRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param forceLocDep Force local deployment flag.
+ * @param batchBytes Batch serialized bytes.
+ * @param depMode Deployment mode.
+ * @param sampleClsName Sample class name.
+ * @param userVer User version.
+ * @param ldrParticipants Loader participants.
+ * @param clsLdrId Class loader ID.
+ */
+ public GridStreamerExecutionRequest(
+ boolean forceLocDep,
+ byte[] batchBytes,
+ @Nullable IgniteDeploymentMode depMode,
+ @Nullable String sampleClsName,
+ @Nullable String userVer,
+ @Nullable Map<UUID, IgniteUuid> ldrParticipants,
+ @Nullable IgniteUuid clsLdrId
+ ) {
+ assert batchBytes != null;
+
+ this.forceLocDep = forceLocDep;
+ this.batchBytes = batchBytes;
+ this.depMode = depMode;
+ this.sampleClsName = sampleClsName;
+ this.userVer = userVer;
+ this.ldrParticipants = ldrParticipants;
+ this.clsLdrId = clsLdrId;
+ }
+
+ /**
+ * @return Force local deployment flag.
+ */
+ public boolean forceLocalDeployment() {
+ return forceLocDep;
+ }
+
+ /**
+ * @return Deployment mode.
+ */
+ public IgniteDeploymentMode deploymentMode() {
+ return depMode;
+ }
+
+ /**
+ * @return Deployment sample class name.
+ */
+ public String sampleClassName() {
+ return sampleClsName;
+ }
+
+ /**
+ * @return Deployment user version.
+ */
+ public String userVersion() {
+ return userVer;
+ }
+
+ /**
+ * @return Node class loader participants.
+ */
+ public Map<UUID, IgniteUuid> loaderParticipants() {
+ return ldrParticipants;
+ }
+
+ /**
+ * @return Class loader ID.
+ */
+ public IgniteUuid classLoaderId() {
+ return clsLdrId;
+ }
+
+ /**
+ * @return Serialized batch in case if P2P class loading is enabled.
+ */
+ public byte[] batchBytes() {
+ return batchBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridStreamerExecutionRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridStreamerExecutionRequest _clone = new GridStreamerExecutionRequest();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridStreamerExecutionRequest _clone = (GridStreamerExecutionRequest)_msg;
+
+ _clone.forceLocDep = forceLocDep;
+ _clone.batchBytes = batchBytes;
+ _clone.depMode = depMode;
+ _clone.sampleClsName = sampleClsName;
+ _clone.userVer = userVer;
+ _clone.ldrParticipants = ldrParticipants;
+ _clone.clsLdrId = clsLdrId;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putByteArray(batchBytes))
++ if (!commState.putByteArray("batchBytes", batchBytes))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putGridUuid(clsLdrId))
++ if (!commState.putGridUuid("clsLdrId", clsLdrId))
+ return false;
+
+ commState.idx++;
+
+ case 2:
- if (!commState.putEnum(depMode))
++ if (!commState.putEnum("depMode", depMode))
+ return false;
+
+ commState.idx++;
+
+ case 3:
- if (!commState.putBoolean(forceLocDep))
++ if (!commState.putBoolean("forceLocDep", forceLocDep))
+ return false;
+
+ commState.idx++;
+
+ case 4:
+ if (ldrParticipants != null) {
+ if (commState.it == null) {
- if (!commState.putInt(ldrParticipants.size()))
++ if (!commState.putInt(null, ldrParticipants.size()))
+ return false;
+
+ commState.it = ldrParticipants.entrySet().iterator();
+ }
+
+ while (commState.it.hasNext() || commState.cur != NULL) {
+ if (commState.cur == NULL)
+ commState.cur = commState.it.next();
+
+ Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur;
+
+ if (!commState.keyDone) {
- if (!commState.putUuid(e.getKey()))
++ if (!commState.putUuid(null, e.getKey()))
+ return false;
+
+ commState.keyDone = true;
+ }
+
- if (!commState.putGridUuid(e.getValue()))
++ if (!commState.putGridUuid(null, e.getValue()))
+ return false;
+
+ commState.keyDone = false;
+
+ commState.cur = NULL;
+ }
+
+ commState.it = null;
+ } else {
- if (!commState.putInt(-1))
++ if (!commState.putInt(null, -1))
+ return false;
+ }
+
+ commState.idx++;
+
+ case 5:
- if (!commState.putString(sampleClsName))
++ if (!commState.putString("sampleClsName", sampleClsName))
+ return false;
+
+ commState.idx++;
+
+ case 6:
- if (!commState.putString(userVer))
++ if (!commState.putString("userVer", userVer))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- byte[] batchBytes0 = commState.getByteArray();
++ batchBytes = commState.getByteArray("batchBytes");
+
- if (batchBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- batchBytes = batchBytes0;
-
+ commState.idx++;
+
+ case 1:
- IgniteUuid clsLdrId0 = commState.getGridUuid();
++ clsLdrId = commState.getGridUuid("clsLdrId");
+
- if (clsLdrId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- clsLdrId = clsLdrId0;
-
+ commState.idx++;
+
+ case 2:
- if (buf.remaining() < 1)
- return false;
++ byte depMode0 = commState.getByte("depMode");
+
- byte depMode0 = commState.getByte();
++ if (!commState.lastRead())
++ return false;
+
+ depMode = IgniteDeploymentMode.fromOrdinal(depMode0);
+
+ commState.idx++;
+
+ case 3:
- if (buf.remaining() < 1)
- return false;
++ forceLocDep = commState.getBoolean("forceLocDep");
+
- forceLocDep = commState.getBoolean();
++ if (!commState.lastRead())
++ return false;
+
+ commState.idx++;
+
+ case 4:
+ if (commState.readSize == -1) {
- if (buf.remaining() < 4)
- return false;
++ commState.readSize = commState.getInt(null);
+
- commState.readSize = commState.getInt();
++ if (!commState.lastRead())
++ return false;
+ }
+
+ if (commState.readSize >= 0) {
+ if (ldrParticipants == null)
- ldrParticipants = U.newHashMap(commState.readSize);
++ ldrParticipants = new HashMap<>(commState.readSize, 1.0f);
+
+ for (int i = commState.readItems; i < commState.readSize; i++) {
+ if (!commState.keyDone) {
- UUID _val = commState.getUuid();
++ UUID _val = commState.getUuid(null);
+
- if (_val == UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ commState.cur = _val;
+ commState.keyDone = true;
+ }
+
- IgniteUuid _val = commState.getGridUuid();
++ IgniteUuid _val = commState.getGridUuid(null);
+
- if (_val == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
+ ldrParticipants.put((UUID)commState.cur, _val);
+
+ commState.keyDone = false;
+
+ commState.readItems++;
+ }
+ }
+
+ commState.readSize = -1;
+ commState.readItems = 0;
+ commState.cur = null;
+
+ commState.idx++;
+
+ case 5:
- String sampleClsName0 = commState.getString();
++ sampleClsName = commState.getString("sampleClsName");
+
- if (sampleClsName0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- sampleClsName = sampleClsName0;
-
+ commState.idx++;
+
+ case 6:
- String userVer0 = commState.getString();
++ userVer = commState.getString("userVer");
+
- if (userVer0 == STR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- userVer = userVer0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 76;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
index 0000000,7ca0e36..fdd21df
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/streamer/GridStreamerResponse.java
@@@ -1,0 -1,160 +1,156 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.streamer;
+
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.nio.*;
+
+ /**
+ *
+ */
+ public class GridStreamerResponse extends GridTcpCommunicationMessageAdapter {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteUuid futId;
+
+ /** */
+ private byte[] errBytes;
+
+ /**
+ *
+ */
+ public GridStreamerResponse() {
+ // No-op.
+ }
+
+ /**
+ * @param futId Future ID.
+ * @param errBytes Serialized error, if any.
+ */
+ public GridStreamerResponse(IgniteUuid futId, @Nullable byte[] errBytes) {
+ assert futId != null;
+
+ this.futId = futId;
+ this.errBytes = errBytes;
+ }
+
+ /**
+ * @return Future ID.
+ */
+ public IgniteUuid futureId() {
+ return futId;
+ }
+
+ /**
+ * @return Serialized error.
+ */
+ public byte[] errorBytes() {
+ return errBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridStreamerResponse.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
+ @Override public GridTcpCommunicationMessageAdapter clone() {
+ GridStreamerResponse _clone = new GridStreamerResponse();
+
+ clone0(_clone);
+
+ return _clone;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
+ GridStreamerResponse _clone = (GridStreamerResponse)_msg;
+
+ _clone.futId = futId;
+ _clone.errBytes = errBytes;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean writeTo(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
++ if (!commState.putByte(null, directType()))
+ return false;
+
+ commState.typeWritten = true;
+ }
+
+ switch (commState.idx) {
+ case 0:
- if (!commState.putByteArray(errBytes))
++ if (!commState.putByteArray("errBytes", errBytes))
+ return false;
+
+ commState.idx++;
+
+ case 1:
- if (!commState.putGridUuid(futId))
++ if (!commState.putGridUuid("futId", futId))
+ return false;
+
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("all")
+ @Override public boolean readFrom(ByteBuffer buf) {
+ commState.setBuffer(buf);
+
+ switch (commState.idx) {
+ case 0:
- byte[] errBytes0 = commState.getByteArray();
++ errBytes = commState.getByteArray("errBytes");
+
- if (errBytes0 == BYTE_ARR_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- errBytes = errBytes0;
-
+ commState.idx++;
+
+ case 1:
- IgniteUuid futId0 = commState.getGridUuid();
++ futId = commState.getGridUuid("futId");
+
- if (futId0 == GRID_UUID_NOT_READ)
++ if (!commState.lastRead())
+ return false;
+
- futId = futId0;
-
+ commState.idx++;
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 77;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3dbb6acc/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
index 0000000,bce75f6..d167e55
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridLongList.java
@@@ -1,0 -1,500 +1,504 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.util;
+
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+
+ /**
+ * Minimal list API to work with primitive longs. This list exists
+ * to avoid boxing/unboxing when using standard list from Java.
+ */
+ public class GridLongList implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long[] arr;
+
+ /** */
+ private int idx;
+
+ /**
+ *
+ */
+ public GridLongList() {
+ // No-op.
+ }
+
+ /**
+ * @param size Size.
+ */
+ public GridLongList(int size) {
+ arr = new long[size];
+ // idx = 0
+ }
+
+ /**
+ * @param arr Array.
+ */
+ public GridLongList(long[] arr) {
+ this.arr = arr;
+
+ idx = arr.length;
+ }
+
+ /**
+ * @param vals Values.
+ * @return List from values.
+ */
+ public static GridLongList asList(long... vals) {
+ if (F.isEmpty(vals))
+ return new GridLongList();
+
+ return new GridLongList(vals);
+ }
+
+ /**
+ * @param arr Array.
+ * @param size Size.
+ */
+ private GridLongList(long[] arr, int size) {
+ this.arr = arr;
+ idx = size;
+ }
+
+ /**
+ * @return Copy of this list.
+ */
+ public GridLongList copy() {
+ if (idx == 0)
+ return new GridLongList();
+
+ return new GridLongList(Arrays.copyOf(arr, idx));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof GridLongList))
+ return false;
+
+ GridLongList that = (GridLongList)o;
+
+ if (idx != that.idx)
+ return false;
+
+ if (idx == 0 || arr == that.arr)
+ return true;
+
+ for (int i = 0; i < idx; i++) {
+ if (arr[i] != that.arr[i])
+ return false;
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = 1;
+
+ for (int i = 0; i < idx; i++) {
+ long element = arr[i];
+ int elementHash = (int)(element ^ (element >>> 32));
+ res = 31 * res + elementHash;
+ }
+
+ return res;
+ }
+
+ /**
+ * @param l List to add all elements of.
+ */
+ public void addAll(GridLongList l) {
+ assert l != null;
+
+ if (l.isEmpty())
+ return;
+
+ if (arr == null)
+ arr = new long[4];
+
+ int len = arr.length;
+
+ while (len < idx + l.size())
+ len <<= 1;
+
+ arr = Arrays.copyOf(arr, len);
+
+ System.arraycopy(l.arr, 0, arr, idx, l.size());
+
+ idx += l.size();
+ }
+
+ /**
+ * Add element to this array.
+ * @param x Value.
+ */
+ public void add(long x) {
+ if (arr == null)
+ arr = new long[4];
+ else if (arr.length == idx)
+ arr = Arrays.copyOf(arr, arr.length << 1);
+
+ arr[idx++] = x;
+ }
+
+ /**
+ * Clears the list.
+ */
+ public void clear() {
+ idx = 0;
+ }
+
+ /**
+ * Gets the last element.
+ *
+ * @return The last element.
+ */
+ public long last() {
+ return arr[idx - 1];
+ }
+
+ /**
+ * Removes and returns the last element of the list. Complementary method to {@link #add(long)} for stack like usage.
+ *
+ * @return Removed element.
+ * @throws NoSuchElementException If the list is empty.
+ */
+ public long remove() throws NoSuchElementException {
+ if (idx == 0)
+ throw new NoSuchElementException();
+
+ return arr[--idx];
+ }
+
+ /**
+ * Returns (possibly reordered) copy of this list, excluding all elements of given list.
+ *
+ * @param l List of elements to remove.
+ * @return New list without all elements from {@code l}.
+ */
+ public GridLongList copyWithout(GridLongList l) {
+ assert l != null;
+
+ if (idx == 0)
+ return new GridLongList();
+
+ if (l.idx == 0)
+ return new GridLongList(Arrays.copyOf(arr, idx));
+
+ long[] newArr = Arrays.copyOf(arr, idx);
+ int newIdx = idx;
+
+ for (int i = 0; i < l.size(); i++) {
+ long rmVal = l.get(i);
+
+ for (int j = 0; j < newIdx; j++) {
+ if (newArr[j] == rmVal) {
+
+ while (newIdx > 0 && newArr[newIdx - 1] == rmVal)
+ newIdx--;
+
+ if (newIdx > 0) {
+ newArr[j] = newArr[newIdx - 1];
+ newIdx--;
+ }
+ }
+ }
+ }
+
+ return new GridLongList(newArr, newIdx);
+ }
+
+ /**
+ * @param i Index.
+ * @return Value.
+ */
+ public long get(int i) {
+ assert i < idx;
+
+ return arr[i];
+ }
+
+ /**
+ * @return Size.
+ */
+ public int size() {
+ return idx;
+ }
+
+ /**
+ * @return {@code True} if this list has no elements.
+ */
+ public boolean isEmpty() {
+ return idx == 0;
+ }
+
+ /**
+ * @param l Element to find.
+ * @return {@code True} if found.
+ */
+ public boolean contains(long l) {
+ for (int i = 0; i < idx; i++) {
+ if (arr[i] == l)
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param l List to check.
+ * @return {@code True} if this list contains all the elements of passed in list.
+ */
+ public boolean containsAll(GridLongList l) {
+ for (int i = 0; i < l.size(); i++) {
+ if (!contains(l.get(i)))
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * @return {@code True} if there are no duplicates.
+ */
+ public boolean distinct() {
+ for (int i = 0; i < idx; i++) {
+ for (int j = i + 1; j < idx; j++) {
+ if (arr[i] == arr[j])
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @param size New size.
+ * @param last If {@code true} the last elements will be removed, otherwise the first.
+ */
+ public void truncate(int size, boolean last) {
+ assert size >= 0 && size <= idx;
+
+ if (size == idx)
+ return;
+
+ if (!last && idx != 0 && size != 0)
+ System.arraycopy(arr, idx - size, arr, 0, size);
+
+ idx = size;
+ }
+
+ /**
+ * Removes element by given index.
+ *
+ * @param i Index.
+ * @return Removed value.
+ */
+ public long removeIndex(int i) {
+ assert i < idx : i;
+
+ long res = arr[i];
+
+ if (i == idx - 1) { // Last element.
+ idx = i;
+ }
+ else {
+ System.arraycopy(arr, i + 1, arr, i, idx - i - 1);
+ idx--;
+ }
+
+ return res;
+ }
+
+ /**
+ * Removes value from this list.
+ *
+ * @param startIdx Index to begin search with.
+ * @param val Value.
+ * @return Index of removed value if the value was found and removed or {@code -1} otherwise.
+ */
+ public int removeValue(int startIdx, long val) {
+ assert startIdx >= 0;
+
+ for (int i = startIdx; i < idx; i++) {
+ if (arr[i] == val) {
+ removeIndex(i);
+
+ return i;
+ }
+ }
+
+ return -1;
+ }
+
+ /**
+ * Removes value from this list.
+ *
+ * @param startIdx Index to begin search with.
+ * @param oldVal Old value.
+ * @param newVal New value.
+ * @return Index of replaced value if the value was found and replaced or {@code -1} otherwise.
+ */
+ public int replaceValue(int startIdx, long oldVal, long newVal) {
+ for (int i = startIdx; i < idx; i++) {
+ if (arr[i] == oldVal) {
+ arr[i] = newVal;
+
+ return i;
+ }
+ }
+
+ return -1;
+ }
+
+ /**
- * @return Internal array.
++ * @return Array copy.
+ */
- public long[] internalArray() {
- return arr;
++ public long[] array() {
++ long[] res = new long[idx];
++
++ System.arraycopy(arr, 0, res, 0, idx);
++
++ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(idx);
+
+ for (int i = 0; i < idx; i++)
+ out.writeLong(arr[i]);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ idx = in.readInt();
+
+ arr = new long[idx];
+
+ for (int i = 0; i < idx; i++)
+ arr[i] = in.readLong();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ SB b = new SB("[");
+
+ for (int i = 0; i < idx; i++) {
+ if (i != 0)
+ b.a(',');
+
+ b.a(arr[i]);
+ }
+
+ b.a(']');
+
+ return S.toString(GridLongList.class, this, "arr", b);
+ }
+
+ /**
+ * @param in Input to read list from.
+ * @return Grid long list.
+ * @throws IOException If failed.
+ */
+ @Nullable public static GridLongList readFrom(DataInput in) throws IOException {
+ int idx = in.readInt();
+
+ if (idx == -1)
+ return null;
+
+ long[] arr = new long[idx];
+
+ for (int i = 0; i < idx; i++)
+ arr[i] = in.readLong();
+
+ return new GridLongList(arr);
+ }
+
+ /**
+ * @param out Output to write to.
+ * @param list List.
+ * @throws IOException If failed.
+ */
+ public static void writeTo(DataOutput out, @Nullable GridLongList list) throws IOException {
+ out.writeInt(list != null ? list.idx : -1);
+
+ if (list != null) {
+ for (int i = 0; i < list.idx; i++)
+ out.writeLong(list.arr[i]);
+ }
+ }
+
+ /**
+ * @param to To list.
+ * @param from From list.
+ * @return To list (passed in or created).
+ */
+ public static GridLongList addAll(@Nullable GridLongList to, GridLongList from) {
+ if (to == null) {
+ GridLongList res = new GridLongList(from.size());
+
+ res.addAll(from);
+
+ return res;
+ }
+ else {
+ to.addAll(from);
+
+ return to;
+ }
+ }
+
+ /**
+ * Sorts this list.
+ * Use {@code copy().sort()} if you need a defensive copy.
+ *
+ * @return {@code this} For chaining.
+ */
+ public GridLongList sort() {
+ if (idx > 1)
+ Arrays.sort(arr, 0, idx);
+
+ return this;
+ }
+
+ /**
+ * Removes given number of elements from the end. If the given number of elements is higher than
+ * list size, then list will be cleared.
+ *
+ * @param cnt Count to pop from the end.
+ */
+ public void pop(int cnt) {
+ assert cnt >= 0 : cnt;
+
+ if (idx < cnt)
+ idx = 0;
+ else
+ idx -= cnt;
+ }
+ }