You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 14:44:08 UTC
[4/5] incubator-ignite git commit: # gg-9869
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
deleted file mode 100644
index 7db41e6..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerProcessor.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.worker.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.thread.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- *
- */
-public class IgniteDataStreamerProcessor<K, V> extends GridProcessorAdapter {
- /** Loaders map (access is not supposed to be highly concurrent). */
- private Collection<IgniteDataStreamerImpl> ldrs = new GridConcurrentHashSet<>();
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** Flushing thread. */
- private Thread flusher;
-
- /** */
- private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ = new DelayQueue<>();
-
- /** Marshaller. */
- private final Marshaller marsh;
-
- /**
- * @param ctx Kernal context.
- */
- public IgniteDataStreamerProcessor(GridKernalContext ctx) {
- super(ctx);
-
- ctx.io().addMessageListener(TOPIC_DATALOAD, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof GridDataLoadRequest;
-
- processDataLoadRequest(nodeId, (GridDataLoadRequest)msg);
- }
- });
-
- marsh = ctx.config().getMarshaller();
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- if (ctx.config().isDaemon())
- return;
-
- flusher = new IgniteThread(new GridWorker(ctx.gridName(), "grid-data-loader-flusher", log) {
- @Override protected void body() throws InterruptedException {
- while (!isCancelled()) {
- IgniteDataStreamerImpl<K, V> ldr = flushQ.take();
-
- if (!busyLock.enterBusy())
- return;
-
- try {
- if (ldr.isClosed())
- continue;
-
- ldr.tryFlush();
-
- flushQ.offer(ldr);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- }
- });
-
- flusher.start();
-
- if (log.isDebugEnabled())
- log.debug("Started data streamer processor.");
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- if (ctx.config().isDaemon())
- return;
-
- ctx.io().removeMessageListener(TOPIC_DATALOAD);
-
- busyLock.block();
-
- U.interrupt(flusher);
- U.join(flusher, log);
-
- for (IgniteDataStreamerImpl<?, ?> ldr : ldrs) {
- if (log.isDebugEnabled())
- log.debug("Closing active data streamer on grid stop [ldr=" + ldr + ", cancel=" + cancel + ']');
-
- try {
- ldr.closeEx(cancel);
- }
- catch (IgniteInterruptedCheckedException e) {
- U.warn(log, "Interrupted while waiting for completion of the data streamer: " + ldr, e);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to close data streamer: " + ldr, e);
- }
- }
-
- if (log.isDebugEnabled())
- log.debug("Stopped data streamer processor.");
- }
-
- /**
- * @param cacheName Cache name ({@code null} for default cache).
- * @param compact {@code true} if data streamer should transfer data in compact format.
- * @return Data streamer.
- */
- public IgniteDataStreamerImpl<K, V> dataStreamer(@Nullable String cacheName, boolean compact) {
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Failed to create data streamer (grid is stopping).");
-
- try {
- final IgniteDataStreamerImpl<K, V> ldr = new IgniteDataStreamerImpl<>(ctx, cacheName, flushQ, compact);
-
- ldrs.add(ldr);
-
- ldr.internalFuture().listenAsync(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- boolean b = ldrs.remove(ldr);
-
- assert b : "Loader has not been added to set: " + ldr;
-
- if (log.isDebugEnabled())
- log.debug("Loader has been completed: " + ldr);
- }
- });
-
- return ldr;
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * @param cacheName Cache name ({@code null} for default cache).
- * @return Data streamer.
- */
- public IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
- return dataStreamer(cacheName, true);
- }
-
- /**
- * @param nodeId Sender ID.
- * @param req Request.
- */
- private void processDataLoadRequest(UUID nodeId, GridDataLoadRequest req) {
- if (!busyLock.enterBusy()) {
- if (log.isDebugEnabled())
- log.debug("Ignoring data load request (node is stopping): " + req);
-
- return;
- }
-
- try {
- if (log.isDebugEnabled())
- log.debug("Processing data load request: " + req);
-
- Object topic;
-
- try {
- topic = marsh.unmarshal(req.responseTopicBytes(), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal topic from request: " + req, e);
-
- return;
- }
-
- ClassLoader clsLdr;
-
- if (req.forceLocalDeployment())
- clsLdr = U.gridClassLoader();
- else {
- GridDeployment dep = ctx.deploy().getGlobalDeployment(
- req.deploymentMode(),
- req.sampleClassName(),
- req.sampleClassName(),
- req.userVersion(),
- nodeId,
- req.classLoaderId(),
- req.participants(),
- null);
-
- if (dep == null) {
- sendResponse(nodeId,
- topic,
- req.requestId(),
- new IgniteCheckedException("Failed to get deployment for request [sndId=" + nodeId +
- ", req=" + req + ']'),
- false);
-
- return;
- }
-
- clsLdr = dep.classLoader();
- }
-
- Collection<Map.Entry<K, V>> col;
- IgniteDataStreamer.Updater<K, V> updater;
-
- try {
- col = marsh.unmarshal(req.collectionBytes(), clsLdr);
- updater = marsh.unmarshal(req.updaterBytes(), clsLdr);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal message [nodeId=" + nodeId + ", req=" + req + ']', e);
-
- sendResponse(nodeId, topic, req.requestId(), e, false);
-
- return;
- }
-
- IgniteDataStreamerUpdateJob<K, V> job = new IgniteDataStreamerUpdateJob<>(ctx,
- log,
- req.cacheName(),
- col,
- req.ignoreDeploymentOwnership(),
- req.skipStore(),
- updater);
-
- Exception err = null;
-
- try {
- job.call();
- }
- catch (Exception e) {
- U.error(log, "Failed to finish update job.", e);
-
- err = e;
- }
-
- sendResponse(nodeId, topic, req.requestId(), err, req.forceLocalDeployment());
- }
- finally {
- busyLock.leaveBusy();
- }
- }
-
- /**
- * @param nodeId Node ID.
- * @param resTopic Response topic.
- * @param reqId Request ID.
- * @param err Error.
- * @param forceLocDep Force local deployment.
- */
- private void sendResponse(UUID nodeId, Object resTopic, long reqId, @Nullable Throwable err,
- boolean forceLocDep) {
- byte[] errBytes;
-
- try {
- errBytes = err != null ? marsh.marshal(err) : null;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal message.", e);
-
- return;
- }
-
- GridDataLoadResponse res = new GridDataLoadResponse(reqId, errBytes, forceLocDep);
-
- try {
- ctx.io().send(nodeId, resTopic, res, PUBLIC_POOL);
- }
- catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(nodeId))
- U.error(log, "Failed to respond to node [nodeId=" + nodeId + ", res=" + res + ']', e);
- else if (log.isDebugEnabled())
- log.debug("Node has left the grid: " + nodeId);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void printMemoryStats() {
- X.println(">>>");
- X.println(">>> Data streamer processor memory stats [grid=" + ctx.gridName() + ']');
- X.println(">>> ldrsSize: " + ldrs.size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
deleted file mode 100644
index 1a3db40..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerUpdateJob.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Job to put entries to cache on affinity node.
- */
-class IgniteDataStreamerUpdateJob<K, V> implements GridPlainCallable<Object> {
- /** */
- private final GridKernalContext ctx;
-
- /** */
- private final IgniteLogger log;
-
- /** Cache name. */
- private final String cacheName;
-
- /** Entries to put. */
- private final Collection<Map.Entry<K, V>> col;
-
- /** {@code True} to ignore deployment ownership. */
- private final boolean ignoreDepOwnership;
-
- /** */
- private final boolean skipStore;
-
- /** */
- private final IgniteDataStreamer.Updater<K, V> updater;
-
- /**
- * @param ctx Context.
- * @param log Log.
- * @param cacheName Cache name.
- * @param col Entries to put.
- * @param ignoreDepOwnership {@code True} to ignore deployment ownership.
- * @param updater Updater.
- */
- IgniteDataStreamerUpdateJob(
- GridKernalContext ctx,
- IgniteLogger log,
- @Nullable String cacheName,
- Collection<Map.Entry<K, V>> col,
- boolean ignoreDepOwnership,
- boolean skipStore,
- IgniteDataStreamer.Updater<K, V> updater) {
- this.ctx = ctx;
- this.log = log;
-
- assert col != null && !col.isEmpty();
- assert updater != null;
-
- this.cacheName = cacheName;
- this.col = col;
- this.ignoreDepOwnership = ignoreDepOwnership;
- this.skipStore = skipStore;
- this.updater = updater;
- }
-
- /** {@inheritDoc} */
- @Override public Object call() throws Exception {
- if (log.isDebugEnabled())
- log.debug("Running put job [nodeId=" + ctx.localNodeId() + ", size=" + col.size() + ']');
-
-// TODO IGNITE-77: restore adapter usage.
-// GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-//
-// IgniteFuture<?> f = cache.context().preloader().startFuture();
-//
-// if (!f.isDone())
-// f.get();
-//
-// if (ignoreDepOwnership)
-// cache.context().deploy().ignoreOwnership(true);
-
- IgniteCacheProxy<K, V> cache = ctx.cache().jcache(cacheName);
-
- if (skipStore)
- cache = (IgniteCacheProxy<K, V>)cache.withSkipStore();
-
- if (ignoreDepOwnership)
- cache.context().deploy().ignoreOwnership(true);
-
- try {
- updater.update(cache, col);
-
- return null;
- }
- finally {
- if (ignoreDepOwnership)
- cache.context().deploy().ignoreOwnership(false);
-
- if (log.isDebugEnabled())
- log.debug("Update job finished on node: " + ctx.localNodeId());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
deleted file mode 100644
index 1090b86..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/package.html
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
-<html>
-<body>
- <!-- Package description. -->
- Data streamer processor.
-</body>
-</html>
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
new file mode 100644
index 0000000..d77b52e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadRequest.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ *
+ */
+public class GridDataLoadRequest implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long reqId;
+
+ /** */
+ private byte[] resTopicBytes;
+
+ /** Cache name. */
+ private String cacheName;
+
+ /** */
+ private byte[] updaterBytes;
+
+ /** Entries to put. */
+ private byte[] colBytes;
+
+ /** {@code True} to ignore deployment ownership. */
+ private boolean ignoreDepOwnership;
+
+ /** */
+ private boolean skipStore;
+
+ /** */
+ private DeploymentMode depMode;
+
+ /** */
+ private String sampleClsName;
+
+ /** */
+ private String userVer;
+
+ /** Node class loader participants. */
+ @GridToStringInclude
+ @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
+ private Map<UUID, IgniteUuid> ldrParticipants;
+
+ /** */
+ private IgniteUuid clsLdrId;
+
+ /** */
+ private boolean forceLocDep;
+
+ /**
+ * {@code Externalizable} support.
+ */
+ public GridDataLoadRequest() {
+ // No-op.
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param resTopicBytes Response topic.
+ * @param cacheName Cache name.
+ * @param updaterBytes Cache updater.
+ * @param colBytes Collection bytes.
+ * @param ignoreDepOwnership Ignore ownership.
+ * @param skipStore Skip store flag.
+ * @param depMode Deployment mode.
+ * @param sampleClsName Sample class name.
+ * @param userVer User version.
+ * @param ldrParticipants Loader participants.
+ * @param clsLdrId Class loader ID.
+ * @param forceLocDep Force local deployment.
+ */
+ public GridDataLoadRequest(long reqId,
+ byte[] resTopicBytes,
+ @Nullable String cacheName,
+ byte[] updaterBytes,
+ byte[] colBytes,
+ boolean ignoreDepOwnership,
+ boolean skipStore,
+ DeploymentMode depMode,
+ String sampleClsName,
+ String userVer,
+ Map<UUID, IgniteUuid> ldrParticipants,
+ IgniteUuid clsLdrId,
+ boolean forceLocDep) {
+ this.reqId = reqId;
+ this.resTopicBytes = resTopicBytes;
+ this.cacheName = cacheName;
+ this.updaterBytes = updaterBytes;
+ this.colBytes = colBytes;
+ this.ignoreDepOwnership = ignoreDepOwnership;
+ this.skipStore = skipStore;
+ this.depMode = depMode;
+ this.sampleClsName = sampleClsName;
+ this.userVer = userVer;
+ this.ldrParticipants = ldrParticipants;
+ this.clsLdrId = clsLdrId;
+ this.forceLocDep = forceLocDep;
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Response topic.
+ */
+ public byte[] responseTopicBytes() {
+ return resTopicBytes;
+ }
+
+ /**
+ * @return Cache name.
+ */
+ public String cacheName() {
+ return cacheName;
+ }
+
+ /**
+ * @return Updater.
+ */
+ public byte[] updaterBytes() {
+ return updaterBytes;
+ }
+
+ /**
+ * @return Collection bytes.
+ */
+ public byte[] collectionBytes() {
+ return colBytes;
+ }
+
+ /**
+ * @return {@code True} to ignore ownership.
+ */
+ public boolean ignoreDeploymentOwnership() {
+ return ignoreDepOwnership;
+ }
+
+ /**
+ * @return Skip store flag.
+ */
+ public boolean skipStore() {
+ return skipStore;
+ }
+
+ /**
+ * @return Deployment mode.
+ */
+ public DeploymentMode deploymentMode() {
+ return depMode;
+ }
+
+ /**
+ * @return Sample class name.
+ */
+ public String sampleClassName() {
+ return sampleClsName;
+ }
+
+ /**
+ * @return User version.
+ */
+ public String userVersion() {
+ return userVer;
+ }
+
+ /**
+ * @return Participants.
+ */
+ public Map<UUID, IgniteUuid> participants() {
+ return ldrParticipants;
+ }
+
+ /**
+ * @return Class loader ID.
+ */
+ public IgniteUuid classLoaderId() {
+ return clsLdrId;
+ }
+
+ /**
+ * @return {@code True} to force local deployment.
+ */
+ public boolean forceLocalDeployment() {
+ return forceLocDep;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDataLoadRequest.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeString("cacheName", cacheName))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeByteArray("colBytes", colBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
+ return false;
+
+ writer.incrementState();
+
+ case 4:
+ if (!writer.writeBoolean("forceLocDep", forceLocDep))
+ return false;
+
+ writer.incrementState();
+
+ case 5:
+ if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
+ return false;
+
+ writer.incrementState();
+
+ case 6:
+ if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
+ return false;
+
+ writer.incrementState();
+
+ case 7:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ case 8:
+ if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 9:
+ if (!writer.writeString("sampleClsName", sampleClsName))
+ return false;
+
+ writer.incrementState();
+
+ case 10:
+ if (!writer.writeBoolean("skipStore", skipStore))
+ return false;
+
+ writer.incrementState();
+
+ case 11:
+ if (!writer.writeByteArray("updaterBytes", updaterBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 12:
+ if (!writer.writeString("userVer", userVer))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ cacheName = reader.readString("cacheName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ clsLdrId = reader.readIgniteUuid("clsLdrId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ colBytes = reader.readByteArray("colBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ byte depModeOrd;
+
+ depModeOrd = reader.readByte("depMode");
+
+ if (!reader.isLastRead())
+ return false;
+
+ depMode = DeploymentMode.fromOrdinal(depModeOrd);
+
+ reader.incrementState();
+
+ case 4:
+ forceLocDep = reader.readBoolean("forceLocDep");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 5:
+ ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 6:
+ ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 7:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 8:
+ resTopicBytes = reader.readByteArray("resTopicBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 9:
+ sampleClsName = reader.readString("sampleClsName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 10:
+ skipStore = reader.readBoolean("skipStore");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 11:
+ updaterBytes = reader.readByteArray("updaterBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 12:
+ userVer = reader.readString("userVer");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 62;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 13;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
new file mode 100644
index 0000000..25ff9ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/GridDataLoadResponse.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.plugin.extensions.communication.*;
+
+import java.nio.*;
+
+/**
+ *
+ */
+public class GridDataLoadResponse implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long reqId;
+
+ /** */
+ private byte[] errBytes;
+
+ /** */
+ private boolean forceLocDep;
+
+ /**
+ * @param reqId Request ID.
+ * @param errBytes Error bytes.
+ * @param forceLocDep Force local deployment.
+ */
+ public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
+ this.reqId = reqId;
+ this.errBytes = errBytes;
+ this.forceLocDep = forceLocDep;
+ }
+
+ /**
+ * {@code Externalizable} support.
+ */
+ public GridDataLoadResponse() {
+ // No-op.
+ }
+
+ /**
+ * @return Request ID.
+ */
+ public long requestId() {
+ return reqId;
+ }
+
+ /**
+ * @return Error bytes.
+ */
+ public byte[] errorBytes() {
+ return errBytes;
+ }
+
+ /**
+ * @return {@code True} to force local deployment.
+ */
+ public boolean forceLocalDeployment() {
+ return forceLocDep;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridDataLoadResponse.class, this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByteArray("errBytes", errBytes))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeBoolean("forceLocDep", forceLocDep))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeLong("reqId", reqId))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ errBytes = reader.readByteArray("errBytes");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ forceLocDep = reader.readBoolean("forceLocDep");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ reqId = reader.readLong("reqId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 63;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
new file mode 100644
index 0000000..629c7b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerCacheUpdaters.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Bundled factory for cache updaters.
+ */
+public class IgniteDataStreamerCacheUpdaters {
+ /** */
+ private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
+
+ /** */
+ private static final IgniteDataStreamer.Updater BATCHED = new Batched();
+
+ /** */
+ private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
+
+ /**
+ * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
+ * is not the best.
+ *
+ * @return Single updater.
+ */
+ public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
+ return INDIVIDUAL;
+ }
+
+ /**
+ * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
+ * updated concurrently. Performance is generally better than in {@link #individual()}.
+ *
+ * @return Batched updater.
+ */
+ public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
+ return BATCHED;
+ }
+
+ /**
+ * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
+ * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
+ * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
+ *
+ * @return Batched sorted updater.
+ */
+ public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
+ return BATCHED_SORTED;
+ }
+
+ /**
+ * Updates cache.
+ *
+ * @param cache Cache.
+ * @param rmvCol Keys to remove.
+ * @param putMap Entries to put.
+ * @throws IgniteException If failed.
+ */
+ protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
+ Map<K, V> putMap) {
+ assert rmvCol != null || putMap != null;
+
+ // Here we assume that there are no key duplicates, so the following calls are valid.
+ if (rmvCol != null)
+ ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
+
+ if (putMap != null)
+ cache.putAll(putMap);
+ }
+
+ /**
+ * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
+ */
+ private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ V val = entry.getValue();
+
+ if (val == null)
+ cache.remove(key);
+ else
+ cache.put(key, val);
+ }
+ }
+ }
+
+ /**
+ * Batched updater. Updates cache using batch operations thus is dead lock prone.
+ */
+ private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ Map<K, V> putAll = null;
+ Set<K> rmvAll = null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key != null;
+
+ V val = entry.getValue();
+
+ if (val == null) {
+ if (rmvAll == null)
+ rmvAll = new HashSet<>();
+
+ rmvAll.add(key);
+ }
+ else {
+ if (putAll == null)
+ putAll = new HashMap<>();
+
+ putAll.put(key, val);
+ }
+ }
+
+ updateAll(cache, rmvAll, putAll);
+ }
+ }
+
+ /**
+ * Batched updater. Updates cache using batch operations thus is dead lock prone.
+ */
+ private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
+ assert cache != null;
+ assert !F.isEmpty(entries);
+
+ Map<K, V> putAll = null;
+ Set<K> rmvAll = null;
+
+ for (Map.Entry<K, V> entry : entries) {
+ K key = entry.getKey();
+
+ assert key instanceof Comparable;
+
+ V val = entry.getValue();
+
+ if (val == null) {
+ if (rmvAll == null)
+ rmvAll = new TreeSet<>();
+
+ rmvAll.add(key);
+ }
+ else {
+ if (putAll == null)
+ putAll = new TreeMap<>();
+
+ putAll.put(key, val);
+ }
+ }
+
+ updateAll(cache, rmvAll, putAll);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
new file mode 100644
index 0000000..b6aa15c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastream/IgniteDataStreamerFuture.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastream;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Data streamer future.
+ */
+class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Data streamer. */
+ @GridToStringExclude
+ private IgniteDataStreamerImpl dataLdr;
+
+ /**
+ * Default constructor for {@link Externalizable} support.
+ */
+ public IgniteDataStreamerFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Context.
+ * @param dataLdr Data streamer.
+ */
+ IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
+ super(ctx);
+
+ assert dataLdr != null;
+
+ this.dataLdr = dataLdr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() throws IgniteCheckedException {
+ checkValid();
+
+ if (onCancelled()) {
+ dataLdr.closeEx(true);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
+ }
+}