You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 14:44:08 UTC

[4/5] incubator-ignite git commit: # gg-9869

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
deleted file mode 100644
index 7db41e6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
-    /** Loaders map (access is not supposed to be highly concurrent). */
-    private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** Flushing thread. */
-    private Thread flusher;
-
-    /** */
-    private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
-
-    /** Marshaller. */
-    private final Marshaller marsh;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public IgniteDataStreamerProcessor(GridKernalContext ctx) {
-        super(ctx);
-
-        ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
-            @Override public void onMessage(UUID nodeId, Object msg) {
-                assert msg instanceof GridDataLoadRequest;
-
-                processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
-            }
-        });
-
-        marsh = ctx.config().getMarshaller();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        if (ctx.config().isDaemon())
-            return;
-
-        flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
-            @Override protected void body() throws InterruptedException {
-                while (!isCancelled()) {
-                    IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
-
-                    if (!busyLock.enterBusy())
-                        return;
-
-                    try {
-                        if (ldr.isClosed())
-                            continue;
-
-                        ldr.tryFlush();
-
-                        flushQ.offer(ldr);
-                    }
-                    finally {
-                        busyLock.leaveBusy();
-                    }
-                }
-            }
-        });
-
-        flusher.start();
-
-        if (log.isDebugEnabled())
-            log.debug("Started data streamer processor.");
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (ctx.config().isDaemon())
-            return;
-
-        ctx.io().removeMessageListener(TOPIC_DATALOAD);
-
-        busyLock.block();
-
-        U.interrupt(flusher);
-        U.join(flusher, log);
-
-        for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
-            if (log.isDebugEnabled())
-                log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
-
-            try {
-                ldr.closeEx(cancel);
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to close data streamer: " + ldr, e);
-            }
-        }
-
-        if (log.isDebugEnabled())
-            log.debug("Stopped data streamer processor.");
-    }
-
-    /**
-     * @param cacheName Cache name ({@code null} for default cache).
-     * @param compact {@code true} if data streamer should transfer data in compact format.
-     * @return Data streamer.
-     */
-    public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
-        if (!busyLock.enterBusy())
-            throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
-
-        try {
-            final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
-
-            ldrs.add(ldr);
-
-            ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> f) {
-                    boolean b = ldrs.remove(ldr);
-
-                    assert b : "Loader has not been added to set: " + ldr;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Loader has been completed: " + ldr);
-                }
-            });
-
-            return ldr;
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param cacheName Cache name ({@code null} for default cache).
-     * @return Data streamer.
-     */
-    public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
-        return dataStreamer(cacheName, true);
-    }
-
-    /**
-     * @param nodeId Sender ID.
-     * @param req Request.
-     */
-    private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
-        if (!busyLock.enterBusy()) {
-            if (log.isDebugEnabled())
-                log.debug("Ignoring data load request (node is stopping): " + req);
-
-            return;
-        }
-
-        try {
-            if (log.isDebugEnabled())
-                log.debug("Processing data load request: " + req);
-
-            Object topic;
-
-            try {
-                topic = marsh.unmarshal(req.responseTopicBytes(), null);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal topic from request: " + req, e);
-
-                return;
-            }
-
-            ClassLoader clsLdr;
-
-            if (req.forceLocalDeployment())
-                clsLdr = U.gridClassLoader();
-            else {
-                GridDeployment dep = ctx.deploy().getGlobalDeployment(
-                    req.deploymentMode(),
-                    req.sampleClassName(),
-                    req.sampleClassName(),
-                    req.userVersion(),
-                    nodeId,
-                    req.classLoaderId(),
-                    req.participants(),
-                    null);
-
-                if (dep == null) {
-                    sendResponse(nodeId,
-                        topic,
-                        req.requestId(),
-                        new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
-                            ", req=" + req + ']'),
-                        false);
-
-                    return;
-                }
-
-                clsLdr = dep.classLoader();
-            }
-
-            Collection<Map.Entry<K, V>> col;
-            IgniteDataStreamer.Updater<K, V> updater;
-
-            try {
-                col = marsh.unmarshal(req.collectionBytes(), clsLdr);
-                updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
-
-                sendResponse(nodeId, topic, req.requestId(), e, false);
-
-                return;
-            }
-
-            IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
-                log,
-                req.cacheName(),
-                col,
-                req.ignoreDeploymentOwnership(),
-                req.skipStore(),
-                updater);
-
-            Exception err = null;
-
-            try {
-                job.call();
-            }
-            catch (Exception e) {
-                U.error(log, "Failed to finish update job.", e);
-
-                err = e;
-            }
-
-            sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * @param nodeId Node ID.
-     * @param resTopic Response topic.
-     * @param reqId Request ID.
-     * @param err Error.
-     * @param forceLocDep Force local deployment.
-     */
-    private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
-        boolean forceLocDep) {
-        byte[] errBytes;
-
-        try {
-            errBytes = err != null ? marsh.marshal(err) : null;
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Failed to marshal message.", e);
-
-            return;
-        }
-
-        GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
-
-        try {
-            ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
-        }
-        catch (IgniteCheckedException e) {
-            if (ctx.discovery().alive(nodeId))
-                U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
-            else if (log.isDebugEnabled())
-                log.debug("Node has left the grid: " + nodeId);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats() {
-        X.println(">>>");
-        X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
-        X.println(">>>   ldrsSize: " + ldrs.size());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
deleted file mode 100644
index 1a3db40..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
-    /** */
-    private final GridKernalContext ctx;
-
-    /** */
-    private final IgniteLogger log;
-
-    /** Cache name. */
-    private final String cacheName;
-
-    /** Entries to put. */
-    private final Collection<Map.Entry<K, V>> col;
-
-    /** {@code True} to ignore deployment ownership. */
-    private final boolean ignoreDepOwnership;
-
-    /** */
-    private final boolean skipStore;
-
-    /** */
-    private final IgniteDataStreamer.Updater<K, V> updater;
-
-    /**
-     * @param ctx Context.
-     * @param log Log.
-     * @param cacheName Cache name.
-     * @param col Entries to put.
-     * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
-     * @param updater Updater.
-     */
-    IgniteDataStreamerUpdateJob(
-        GridKernalContext ctx,
-        IgniteLogger log,
-        @Nullable String cacheName,
-        Collection<Map.Entry<K, V>> col,
-        boolean ignoreDepOwnership,
-        boolean skipStore,
-        IgniteDataStreamer.Updater<K, V> updater) {
-        this.ctx = ctx;
-        this.log = log;
-
-        assert col != null && !col.isEmpty();
-        assert updater != null;
-
-        this.cacheName = cacheName;
-        this.col = col;
-        this.ignoreDepOwnership = ignoreDepOwnership;
-        this.skipStore = skipStore;
-        this.updater = updater;
-    }
-
-    /** {@inheritDoc} */
-    @Override public Object call() throws Exception {
-        if (log.isDebugEnabled())
-            log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
-
-//        TODO IGNITE-77: restore adapter usage.
-//        GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-//
-//        IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-//        if (!f.isDone())
-//            f.get();
-//
-//        if (ignoreDepOwnership)
-//            cache.context().deploy().ignoreOwnership(true);
-
-        IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
-        if (skipStore)
-            cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
-        if (ignoreDepOwnership)
-            cache.context().deploy().ignoreOwnership(true);
-
-        try {
-            updater.update(cache, col);
-
-            return null;
-        }
-        finally {
-            if (ignoreDepOwnership)
-                cache.context().deploy().ignoreOwnership(false);
-
-            if (log.isDebugEnabled())
-                log.debug("Update job finished on node: " + ctx.localNodeId());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
deleted file mode 100644
index 1090b86..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
-    <!-- Package description. -->
-    Data streamer processor.
-</body>
-</html>

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
new file mode 100644
index 0000000..d77b52e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridDataLoadRequest implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] resTopicBytes;
+
+    /** Cache name. */
+    private String cacheName;
+
+    /** */
+    private byte[] updaterBytes;
+
+    /** Entries to put. */
+    private byte[] colBytes;
+
+    /** {@code True} to ignore deployment ownership. */
+    private boolean ignoreDepOwnership;
+
+    /** */
+    private boolean skipStore;
+
+    /** */
+    private DeploymentMode depMode;
+
+    /** */
+    private String sampleClsName;
+
+    /** */
+    private String userVer;
+
+    /** Node class loader participants. */
+    @GridToStringInclude
+    @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+    private Map<UUID, IgniteUuid> ldrParticipants;
+
+    /** */
+    private IgniteUuid clsLdrId;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public GridDataLoadRequest() {
+        // No-op.
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param resTopicBytes Response topic.
+     * @param cacheName Cache name.
+     * @param updaterBytes Cache updater.
+     * @param colBytes Collection bytes.
+     * @param ignoreDepOwnership Ignore ownership.
+     * @param skipStore Skip store flag.
+     * @param depMode Deployment mode.
+     * @param sampleClsName Sample class name.
+     * @param userVer User version.
+     * @param ldrParticipants Loader participants.
+     * @param clsLdrId Class loader ID.
+     * @param forceLocDep Force local deployment.
+     */
+    public GridDataLoadRequest(long reqId,
+        byte[] resTopicBytes,
+        @Nullable String cacheName,
+        byte[] updaterBytes,
+        byte[] colBytes,
+        boolean ignoreDepOwnership,
+        boolean skipStore,
+        DeploymentMode depMode,
+        String sampleClsName,
+        String userVer,
+        Map<UUID, IgniteUuid> ldrParticipants,
+        IgniteUuid clsLdrId,
+        boolean forceLocDep) {
+        this.reqId = reqId;
+        this.resTopicBytes = resTopicBytes;
+        this.cacheName = cacheName;
+        this.updaterBytes = updaterBytes;
+        this.colBytes = colBytes;
+        this.ignoreDepOwnership = ignoreDepOwnership;
+        this.skipStore = skipStore;
+        this.depMode = depMode;
+        this.sampleClsName = sampleClsName;
+        this.userVer = userVer;
+        this.ldrParticipants = ldrParticipants;
+        this.clsLdrId = clsLdrId;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Response topic.
+     */
+    public byte[] responseTopicBytes() {
+        return resTopicBytes;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Updater.
+     */
+    public byte[] updaterBytes() {
+        return updaterBytes;
+    }
+
+    /**
+     * @return Collection bytes.
+     */
+    public byte[] collectionBytes() {
+        return colBytes;
+    }
+
+    /**
+     * @return {@code True} to ignore ownership.
+     */
+    public boolean ignoreDeploymentOwnership() {
+        return ignoreDepOwnership;
+    }
+
+    /**
+     * @return Skip store flag.
+     */
+    public boolean skipStore() {
+        return skipStore;
+    }
+
+    /**
+     * @return Deployment mode.
+     */
+    public DeploymentMode deploymentMode() {
+        return depMode;
+    }
+
+    /**
+     * @return Sample class name.
+     */
+    public String sampleClassName() {
+        return sampleClsName;
+    }
+
+    /**
+     * @return User version.
+     */
+    public String userVersion() {
+        return userVer;
+    }
+
+    /**
+     * @return Participants.
+     */
+    public Map<UUID, IgniteUuid> participants() {
+        return ldrParticipants;
+    }
+
+    /**
+     * @return Class loader ID.
+     */
+    public IgniteUuid classLoaderId() {
+        return clsLdrId;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoadRequest.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeString("cacheName", cacheName))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByteArray("colBytes", colBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
+                    return false;
+
+                writer.incrementState();
+
+            case 4:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 5:
+                if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
+                    return false;
+
+                writer.incrementState();
+
+            case 6:
+                if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+            case 8:
+                if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 9:
+                if (!writer.writeString("sampleClsName", sampleClsName))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
+                if (!writer.writeBoolean("skipStore", skipStore))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByteArray("updaterBytes", updaterBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
+                if (!writer.writeString("userVer", userVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                cacheName = reader.readString("cacheName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                clsLdrId = reader.readIgniteUuid("clsLdrId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                colBytes = reader.readByteArray("colBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                byte depModeOrd;
+
+                depModeOrd = reader.readByte("depMode");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                depMode = DeploymentMode.fromOrdinal(depModeOrd);
+
+                reader.incrementState();
+
+            case 4:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 5:
+                ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 6:
+                ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 8:
+                resTopicBytes = reader.readByteArray("resTopicBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 9:
+                sampleClsName = reader.readString("sampleClsName");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
+                skipStore = reader.readBoolean("skipStore");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                updaterBytes = reader.readByteArray("updaterBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
+                userVer = reader.readString("userVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 62;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 13;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
new file mode 100644
index 0000000..25ff9ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class GridDataLoadResponse implements Message {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long reqId;
+
+    /** */
+    private byte[] errBytes;
+
+    /** */
+    private boolean forceLocDep;
+
+    /**
+     * @param reqId Request ID.
+     * @param errBytes Error bytes.
+     * @param forceLocDep Force local deployment.
+     */
+    public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
+        this.reqId = reqId;
+        this.errBytes = errBytes;
+        this.forceLocDep = forceLocDep;
+    }
+
+    /**
+     * {@code Externalizable} support.
+     */
+    public GridDataLoadResponse() {
+        // No-op.
+    }
+
+    /**
+     * @return Request ID.
+     */
+    public long requestId() {
+        return reqId;
+    }
+
+    /**
+     * @return Error bytes.
+     */
+    public byte[] errorBytes() {
+        return errBytes;
+    }
+
+    /**
+     * @return {@code True} to force local deployment.
+     */
+    public boolean forceLocalDeployment() {
+        return forceLocDep;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridDataLoadResponse.class, this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByteArray("errBytes", errBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeBoolean("forceLocDep", forceLocDep))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeLong("reqId", reqId))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                errBytes = reader.readByteArray("errBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                forceLocDep = reader.readBoolean("forceLocDep");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                reqId = reader.readLong("reqId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return 63;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 3;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..629c7b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+    /** */
+    private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+    /** */
+    private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+    /**
+     * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+     * is not the best.
+     *
+     * @return Single updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+        return INDIVIDUAL;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+     * updated concurrently. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched updater.
+     */
+    public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+        return BATCHED;
+    }
+
+    /**
+     * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+     * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+     * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+     *
+     * @return Batched sorted updater.
+     */
+    public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+        return BATCHED_SORTED;
+    }
+
+    /**
+     * Updates cache.
+     *
+     * @param cache Cache.
+     * @param rmvCol Keys to remove.
+     * @param putMap Entries to put.
+     * @throws IgniteException If failed.
+     */
+    protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+        Map<K, V> putMap) {
+        assert rmvCol != null || putMap != null;
+
+        // Here we assume that there are no key duplicates, so the following calls are valid.
+        if (rmvCol != null)
+            ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+        if (putMap != null)
+            cache.putAll(putMap);
+    }
+
+    /**
+     * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+     */
+    private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null)
+                    cache.remove(key);
+                else
+                    cache.put(key, val);
+            }
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key != null;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new HashSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new HashMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+
+    /**
+     * Batched updater. Updates cache using batch operations thus is dead lock prone.
+     */
+    private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+            assert cache != null;
+            assert !F.isEmpty(entries);
+
+            Map<K, V> putAll = null;
+            Set<K> rmvAll = null;
+
+            for (Map.Entry<K, V> entry : entries) {
+                K key = entry.getKey();
+
+                assert key instanceof Comparable;
+
+                V val = entry.getValue();
+
+                if (val == null) {
+                    if (rmvAll == null)
+                        rmvAll = new TreeSet<>();
+
+                    rmvAll.add(key);
+                }
+                else {
+                    if (putAll == null)
+                        putAll = new TreeMap<>();
+
+                    putAll.put(key, val);
+                }
+            }
+
+            updateAll(cache, rmvAll, putAll);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
new file mode 100644
index 0000000..b6aa15c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Data streamer future.
+ */
+class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Data streamer. */
+    @GridToStringExclude
+    private IgniteDataStreamerImpl dataLdr;
+
+    /**
+     * Default constructor for {@link Externalizable} support.
+     */
+    public IgniteDataStreamerFuture() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Context.
+     * @param dataLdr Data streamer.
+     */
+    IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
+        super(ctx);
+
+        assert dataLdr != null;
+
+        this.dataLdr = dataLdr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean cancel() throws IgniteCheckedException {
+        checkValid();
+
+        if (onCancelled()) {
+            dataLdr.closeEx(true);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
+    }
+}