You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 14:37:09 UTC
ignite git commit: IGNITE-1319: Moved platform services to Ignite.
Repository: ignite
Updated Branches:
refs/heads/ignite-1319 [created] 8714caa18
IGNITE-1319: Moved platform services to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8714caa1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8714caa1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8714caa1
Branch: refs/heads/ignite-1319
Commit: 8714caa181d043aa741ca6b485250e575b0bc1dd
Parents: 16c095a
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 15:37:50 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 15:37:50 2015 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 9 +
.../platform/services/PlatformService.java | 44 ++++
.../platform/dotnet/PlatformDotNetService.java | 27 ++
.../dotnet/PlatformDotNetServiceImpl.java | 47 ++++
.../services/PlatformAbstractService.java | 223 ++++++++++++++++
.../platform/services/PlatformServices.java | 252 +++++++++++++++++++
6 files changed, 602 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/8714caa1/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 9b4a891..064cd91 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.platform.cache.query.*;
import org.apache.ignite.internal.processors.platform.callback.*;
import org.apache.ignite.internal.processors.platform.compute.*;
import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.stream.*;
import org.jetbrains.annotations.*;
@@ -260,4 +261,12 @@ public interface PlatformContext {
* @return Stream receiver.
*/
public StreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable);
+
+ /**
+ * Create cluster node filter.
+ *
+ * @param filter Native filter.
+ * @return Cluster node filter.
+ */
+ public IgnitePredicate<ClusterNode> createClusterNodeFilter(Object filter);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8714caa1/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java
new file mode 100644
index 0000000..7b1daad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.services;
+
+import org.apache.ignite.*;
+import org.apache.ignite.services.*;
+
+/**
+ * Base class for all platform services.
+ */
+public interface PlatformService extends Service {
+ /**
+ * Invokes native service method.
+ *
+ * @param mthdName Method name.
+ * @param srvKeepPortable Server keep portable flag.
+ * @param args Arguments.
+ * @return Resulting data.
+ * @throws org.apache.ignite.IgniteCheckedException If failed.
+ */
+ public Object invokeMethod(String mthdName, boolean srvKeepPortable, Object[] args) throws IgniteCheckedException;
+
+ /**
+ * Gets native pointer.
+ *
+ * @return Native pointer.
+ */
+ public long pointer();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8714caa1/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java
new file mode 100644
index 0000000..7c61cf8
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetService.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.services.*;
+
+/**
+ * Marker interface to denote a service implemented on .Net platform.
+ */
+public interface PlatformDotNetService extends PlatformService {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8714caa1/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
new file mode 100644
index 0000000..74e143d
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetServiceImpl.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.dotnet;
+
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.services.*;
+
+/**
+ * Interop .Net service.
+ */
+public class PlatformDotNetServiceImpl extends PlatformAbstractService implements PlatformDotNetService {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Default constructor for serialization.
+ */
+ public PlatformDotNetServiceImpl() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param svc Service.
+ * @param ctx Context.
+ * @param srvKeepPortable Whether to keep objects portable on server if possible.
+ */
+ public PlatformDotNetServiceImpl(Object svc, PlatformContext ctx, boolean srvKeepPortable) {
+ super(svc, ctx, srvKeepPortable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8714caa1/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
new file mode 100644
index 0000000..d53b9b5
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformAbstractService.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.services;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.services.*;
+
+import java.io.*;
+
+/**
+ * Base platform service implementation.
+ */
+public abstract class PlatformAbstractService implements PlatformService, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** .Net portable service. */
+ protected Object svc;
+
+ /** Whether to keep objects portable on server if possible. */
+ protected boolean srvKeepPortable;
+
+ /** Pointer to deployed service. */
+ protected transient long ptr;
+
+ /** Context. */
+ protected transient PlatformContext platformCtx;
+
+ /**
+ * Default constructor for serialization.
+ */
+ public PlatformAbstractService() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param svc Service.
+ * @param ctx Context.
+ * @param srvKeepPortable Whether to keep objects portable on server if possible.
+ */
+ public PlatformAbstractService(Object svc, PlatformContext ctx, boolean srvKeepPortable) {
+ assert svc != null;
+ assert ctx != null;
+
+ this.svc = svc;
+ this.platformCtx = ctx;
+ this.srvKeepPortable = srvKeepPortable;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ assert ptr == 0;
+ assert platformCtx != null;
+
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ writer.writeBoolean(srvKeepPortable);
+ writer.writeObject(svc);
+
+ writeServiceContext(ctx, writer);
+
+ out.synchronize();
+
+ ptr = platformCtx.gateway().serviceInit(mem.pointer());
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ assert ptr != 0;
+ assert platformCtx != null;
+
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ writer.writeBoolean(srvKeepPortable);
+
+ writeServiceContext(ctx, writer);
+
+ out.synchronize();
+
+ platformCtx.gateway().serviceExecute(ptr, mem.pointer());
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ assert ptr != 0;
+ assert platformCtx != null;
+
+ try (PlatformMemory mem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ writer.writeBoolean(srvKeepPortable);
+
+ writeServiceContext(ctx, writer);
+
+ out.synchronize();
+
+ platformCtx.gateway().serviceCancel(ptr, mem.pointer());
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * Writes service context.
+ *
+ * @param ctx Context.
+ * @param writer Writer.
+ */
+ private void writeServiceContext(ServiceContext ctx, PortableRawWriterEx writer) {
+ writer.writeString(ctx.name());
+ writer.writeUuid(ctx.executionId());
+ writer.writeBoolean(ctx.isCancelled());
+ writer.writeString(ctx.cacheName());
+ writer.writeObject(ctx.affinityKey());
+ }
+
+ /** {@inheritDoc} */
+ @Override public long pointer() {
+ assert ptr != 0;
+
+ return ptr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object invokeMethod(String mthdName, boolean srvKeepPortable, Object[] args)
+ throws IgniteCheckedException {
+ assert ptr != 0;
+ assert platformCtx != null;
+
+ try (PlatformMemory outMem = platformCtx.memory().allocate()) {
+ PlatformOutputStream out = outMem.output();
+ PortableRawWriterEx writer = platformCtx.writer(out);
+
+ writer.writeBoolean(srvKeepPortable);
+ writer.writeString(mthdName);
+
+ if (args == null)
+ writer.writeBoolean(false);
+ else {
+ writer.writeBoolean(true);
+ writer.writeInt(args.length);
+
+ for (Object arg : args)
+ writer.writeObjectDetached(arg);
+ }
+
+ out.synchronize();
+
+ try (PlatformMemory inMem = platformCtx.memory().allocate()) {
+ PlatformInputStream in = inMem.input();
+
+ PortableRawReaderEx reader = platformCtx.reader(in);
+
+ platformCtx.gateway().serviceInvokeMethod(ptr, outMem.pointer(), inMem.pointer());
+
+ in.synchronize();
+
+ return PlatformUtils.readInvocationResult(platformCtx, reader);
+ }
+ }
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ @IgniteInstanceResource
+ public void setIgniteInstance(Ignite ignite) {
+ platformCtx = PlatformUtils.platformContext(ignite);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ svc = in.readObject();
+ srvKeepPortable = in.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(svc);
+ out.writeBoolean(srvKeepPortable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8714caa1/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
new file mode 100644
index 0000000..d0956f9
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/services/PlatformServices.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.services;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.dotnet.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.services.*;
+
+import java.util.*;
+
+/**
+ * Interop services.
+ */
+@SuppressWarnings({"UnusedDeclaration"})
+public class PlatformServices extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_DOTNET_DEPLOY = 1;
+
+ /** */
+ private static final int OP_DOTNET_DEPLOY_MULTIPLE = 2;
+
+ /** */
+ private static final int OP_DOTNET_SERVICES = 3;
+
+ /** */
+ private static final int OP_DOTNET_INVOKE = 4;
+
+ /** */
+ private static final int OP_DESCRIPTORS = 5;
+
+ /** */
+ private final IgniteServices services;
+
+ /** Server keep portable flag. */
+ private final boolean srvKeepPortable;
+
+ /**
+ * Ctor.
+ *
+ * @param platformCtx Context.
+ * @param services Services facade.
+ * @param srvKeepPortable Server keep portable flag.
+ */
+ public PlatformServices(PlatformContext platformCtx, IgniteServices services, boolean srvKeepPortable) {
+ super(platformCtx);
+
+ assert services != null;
+
+ this.services = services;
+ this.srvKeepPortable = srvKeepPortable;
+ }
+
+ /**
+ * Gets services with asynchronous mode enabled.
+ *
+ * @return Services with asynchronous mode enabled.
+ */
+ public PlatformServices withAsync() {
+ if (services.isAsync())
+ return this;
+
+ return new PlatformServices(platformCtx, services.withAsync(), srvKeepPortable);
+ }
+
+ /**
+ * Gets services with server "keep portable" mode enabled.
+ *
+ * @return Services with server "keep portable" mode enabled.
+ */
+ public PlatformServices withServerKeepPortable() {
+ return srvKeepPortable ? this : new PlatformServices(platformCtx, services, true);
+ }
+
+ /**
+ * Cancels service deployment.
+ *
+ * @param name Name of service to cancel.
+ */
+ public void cancel(String name) {
+ services.cancel(name);
+ }
+
+ /**
+ * Cancels all deployed services.
+ */
+ public void cancelAll() {
+ services.cancelAll();
+ }
+
+ /**
+ * Gets a remote handle on the service.
+ *
+ * @param name Service name.
+ * @param sticky Whether or not Ignite should always contact the same remote service.
+ * @return Either proxy over remote service or local service if it is deployed locally.
+ */
+ public Object dotNetServiceProxy(String name, boolean sticky) {
+ return services.serviceProxy(name, PlatformDotNetService.class, sticky);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_DOTNET_DEPLOY: {
+ ServiceConfiguration cfg = new ServiceConfiguration();
+
+ cfg.setName(reader.readString());
+ cfg.setService(new PlatformDotNetServiceImpl(reader.readObjectDetached(), platformCtx, srvKeepPortable));
+ cfg.setTotalCount(reader.readInt());
+ cfg.setMaxPerNodeCount(reader.readInt());
+ cfg.setCacheName(reader.readString());
+ cfg.setAffinityKey(reader.readObjectDetached());
+
+ Object filter = reader.readObjectDetached();
+
+ if (filter != null)
+ cfg.setNodeFilter(platformCtx.createClusterNodeFilter(filter));
+
+ services.deploy(cfg);
+
+ return TRUE;
+ }
+
+ case OP_DOTNET_DEPLOY_MULTIPLE: {
+ String name = reader.readString();
+ Object svc = reader.readObjectDetached();
+ int totalCnt = reader.readInt();
+ int maxPerNodeCnt = reader.readInt();
+
+ services.deployMultiple(name, new PlatformDotNetServiceImpl(svc, platformCtx, srvKeepPortable),
+ totalCnt, maxPerNodeCnt);
+
+ return TRUE;
+ }
+ }
+
+ return super.processInOp(type, reader);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+ Object arg) throws IgniteCheckedException {
+ switch (type) {
+ case OP_DOTNET_SERVICES: {
+ Collection<Service> svcs = services.services(reader.readString());
+
+ PlatformUtils.writeNullableCollection(writer, svcs,
+ new PlatformWriterClosure<Service>() {
+ @Override public void write(PortableRawWriterEx writer, Service svc) {
+ writer.writeLong(((PlatformService) svc).pointer());
+ }
+ },
+ new IgnitePredicate<Service>() {
+ @Override public boolean apply(Service svc) {
+ return svc instanceof PlatformDotNetService;
+ }
+ }
+ );
+
+ return;
+ }
+
+ case OP_DOTNET_INVOKE: {
+ assert arg != null;
+ assert arg instanceof PlatformDotNetService;
+
+ String mthdName = reader.readString();
+
+ Object[] args;
+
+ if (reader.readBoolean()) {
+ args = new Object[reader.readInt()];
+
+ for (int i = 0; i < args.length; i++)
+ args[i] = reader.readObjectDetached();
+ }
+ else
+ args = null;
+
+ try {
+ Object result = ((PlatformDotNetService)arg).invokeMethod(mthdName, srvKeepPortable, args);
+
+ PlatformUtils.writeInvocationResult(writer, result, null);
+ }
+ catch (Exception e) {
+ PlatformUtils.writeInvocationResult(writer, null, e);
+ }
+
+ return;
+ }
+ }
+
+ super.processInOutOp(type, reader, writer, arg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+ switch (type) {
+ case OP_DESCRIPTORS: {
+ Collection<ServiceDescriptor> descs = services.serviceDescriptors();
+
+ PlatformUtils.writeCollection(writer, descs, new PlatformWriterClosure<ServiceDescriptor>() {
+ @Override public void write(PortableRawWriterEx writer, ServiceDescriptor d) {
+ writer.writeString(d.name());
+ writer.writeString(d.cacheName());
+ writer.writeInt(d.maxPerNodeCount());
+ writer.writeInt(d.totalCount());
+ writer.writeUuid(d.originNodeId());
+ writer.writeObject(d.affinityKey());
+
+ Map<UUID, Integer> top = d.topologySnapshot();
+
+ PlatformUtils.writeMap(writer, top, new PlatformWriterBiClosure<UUID, Integer>() {
+ @Override public void write(PortableRawWriterEx writer, UUID key, Integer val) {
+ writer.writeUuid(key);
+ writer.writeInt(val);
+ }
+ });
+ }
+ });
+
+ return;
+ }
+ }
+
+ super.processOutOp(type, writer);
+ }
+
+ /** <inheritDoc /> */
+ @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+ return services.future();
+ }
+}