You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/04 14:44:09 UTC
[5/5] incubator-ignite git commit: # gg-9869
# gg-9869
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c8217c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c8217c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c8217c1
Branch: refs/heads/ignite-394
Commit: 9c8217c17851487f783bf4fc05d75b4d2996c251
Parents: 96f426b
Author: Artem Shutak <as...@gridgain.com>
Authored: Wed Mar 4 15:27:48 2015 +0300
Committer: Artem Shutak <as...@gridgain.com>
Committed: Wed Mar 4 15:27:48 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalContext.java | 2 +-
.../ignite/internal/GridKernalContextImpl.java | 2 +-
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../communication/GridIoMessageFactory.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 2 +-
.../GridDistributedCacheAdapter.java | 2 +-
.../dataload/GridDataLoadRequest.java | 450 ------
.../dataload/GridDataLoadResponse.java | 166 --
.../IgniteDataStreamerCacheUpdaters.java | 199 ---
.../dataload/IgniteDataStreamerFuture.java | 75 -
.../dataload/IgniteDataStreamerImpl.java | 1453 ------------------
.../dataload/IgniteDataStreamerProcessor.java | 316 ----
.../dataload/IgniteDataStreamerUpdateJob.java | 119 --
.../internal/processors/dataload/package.html | 24 -
.../datastream/GridDataLoadRequest.java | 450 ++++++
.../datastream/GridDataLoadResponse.java | 166 ++
.../IgniteDataStreamerCacheUpdaters.java | 199 +++
.../datastream/IgniteDataStreamerFuture.java | 75 +
.../datastream/IgniteDataStreamerImpl.java | 1453 ++++++++++++++++++
.../datastream/IgniteDataStreamerProcessor.java | 316 ++++
.../datastream/IgniteDataStreamerUpdateJob.java | 119 ++
.../internal/processors/datastream/package.html | 24 +
.../processors/igfs/IgfsDataManager.java | 2 +-
.../IgniteDataStreamerImplSelfTest.java | 214 ---
.../IgniteDataStreamerPerformanceTest.java | 199 ---
.../IgniteDataStreamerProcessorSelfTest.java | 924 -----------
.../IgniteDataStreamerImplSelfTest.java | 214 +++
.../IgniteDataStreamerPerformanceTest.java | 199 +++
.../IgniteDataStreamerProcessorSelfTest.java | 924 +++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +-
30 files changed, 4147 insertions(+), 4147 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 616eac7..48752cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -35,7 +35,7 @@ import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.cluster.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a38ca92..edc5e00 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -38,7 +38,7 @@ import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.cluster.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.igfs.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 60df162..a13da36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -41,7 +41,7 @@ import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.closure.*;
import org.apache.ignite.internal.processors.cluster.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.processors.hadoop.*;
import org.apache.ignite.internal.processors.job.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 6109d74..2079233 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.clock.*;
import org.apache.ignite.internal.processors.continuous.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.igfs.*;
import org.apache.ignite.internal.processors.rest.handlers.task.*;
import org.apache.ignite.internal.processors.streamer.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 83118c4..dd9efd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -32,7 +32,7 @@ import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.query.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.dr.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.transactions.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 16419f9..f745b5f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -26,7 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dataload.*;
+import org.apache.ignite.internal.processors.datastream.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.typedef.internal.*;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
deleted file mode 100644
index b3828ed..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadRequest.java
+++ /dev/null
@@ -1,450 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.nio.*;
-import java.util.*;
-
-/**
- *
- */
-public class GridDataLoadRequest implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long reqId;
-
- /** */
- private byte[] resTopicBytes;
-
- /** Cache name. */
- private String cacheName;
-
- /** */
- private byte[] updaterBytes;
-
- /** Entries to put. */
- private byte[] colBytes;
-
- /** {@code True} to ignore deployment ownership. */
- private boolean ignoreDepOwnership;
-
- /** */
- private boolean skipStore;
-
- /** */
- private DeploymentMode depMode;
-
- /** */
- private String sampleClsName;
-
- /** */
- private String userVer;
-
- /** Node class loader participants. */
- @GridToStringInclude
- @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
- private Map<UUID, IgniteUuid> ldrParticipants;
-
- /** */
- private IgniteUuid clsLdrId;
-
- /** */
- private boolean forceLocDep;
-
- /**
- * {@code Externalizable} support.
- */
- public GridDataLoadRequest() {
- // No-op.
- }
-
- /**
- * @param reqId Request ID.
- * @param resTopicBytes Response topic.
- * @param cacheName Cache name.
- * @param updaterBytes Cache updater.
- * @param colBytes Collection bytes.
- * @param ignoreDepOwnership Ignore ownership.
- * @param skipStore Skip store flag.
- * @param depMode Deployment mode.
- * @param sampleClsName Sample class name.
- * @param userVer User version.
- * @param ldrParticipants Loader participants.
- * @param clsLdrId Class loader ID.
- * @param forceLocDep Force local deployment.
- */
- public GridDataLoadRequest(long reqId,
- byte[] resTopicBytes,
- @Nullable String cacheName,
- byte[] updaterBytes,
- byte[] colBytes,
- boolean ignoreDepOwnership,
- boolean skipStore,
- DeploymentMode depMode,
- String sampleClsName,
- String userVer,
- Map<UUID, IgniteUuid> ldrParticipants,
- IgniteUuid clsLdrId,
- boolean forceLocDep) {
- this.reqId = reqId;
- this.resTopicBytes = resTopicBytes;
- this.cacheName = cacheName;
- this.updaterBytes = updaterBytes;
- this.colBytes = colBytes;
- this.ignoreDepOwnership = ignoreDepOwnership;
- this.skipStore = skipStore;
- this.depMode = depMode;
- this.sampleClsName = sampleClsName;
- this.userVer = userVer;
- this.ldrParticipants = ldrParticipants;
- this.clsLdrId = clsLdrId;
- this.forceLocDep = forceLocDep;
- }
-
- /**
- * @return Request ID.
- */
- public long requestId() {
- return reqId;
- }
-
- /**
- * @return Response topic.
- */
- public byte[] responseTopicBytes() {
- return resTopicBytes;
- }
-
- /**
- * @return Cache name.
- */
- public String cacheName() {
- return cacheName;
- }
-
- /**
- * @return Updater.
- */
- public byte[] updaterBytes() {
- return updaterBytes;
- }
-
- /**
- * @return Collection bytes.
- */
- public byte[] collectionBytes() {
- return colBytes;
- }
-
- /**
- * @return {@code True} to ignore ownership.
- */
- public boolean ignoreDeploymentOwnership() {
- return ignoreDepOwnership;
- }
-
- /**
- * @return Skip store flag.
- */
- public boolean skipStore() {
- return skipStore;
- }
-
- /**
- * @return Deployment mode.
- */
- public DeploymentMode deploymentMode() {
- return depMode;
- }
-
- /**
- * @return Sample class name.
- */
- public String sampleClassName() {
- return sampleClsName;
- }
-
- /**
- * @return User version.
- */
- public String userVersion() {
- return userVer;
- }
-
- /**
- * @return Participants.
- */
- public Map<UUID, IgniteUuid> participants() {
- return ldrParticipants;
- }
-
- /**
- * @return Class loader ID.
- */
- public IgniteUuid classLoaderId() {
- return clsLdrId;
- }
-
- /**
- * @return {@code True} to force local deployment.
- */
- public boolean forceLocalDeployment() {
- return forceLocDep;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDataLoadRequest.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeString("cacheName", cacheName))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeIgniteUuid("clsLdrId", clsLdrId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByteArray("colBytes", colBytes))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeByte("depMode", depMode != null ? (byte)depMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeBoolean("forceLocDep", forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeBoolean("ignoreDepOwnership", ignoreDepOwnership))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeMap("ldrParticipants", ldrParticipants, MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeLong("reqId", reqId))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeByteArray("resTopicBytes", resTopicBytes))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeString("sampleClsName", sampleClsName))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeBoolean("skipStore", skipStore))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeByteArray("updaterBytes", updaterBytes))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeString("userVer", userVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- cacheName = reader.readString("cacheName");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- clsLdrId = reader.readIgniteUuid("clsLdrId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- colBytes = reader.readByteArray("colBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- byte depModeOrd;
-
- depModeOrd = reader.readByte("depMode");
-
- if (!reader.isLastRead())
- return false;
-
- depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
- reader.incrementState();
-
- case 4:
- forceLocDep = reader.readBoolean("forceLocDep");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- ignoreDepOwnership = reader.readBoolean("ignoreDepOwnership");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- ldrParticipants = reader.readMap("ldrParticipants", MessageCollectionItemType.UUID, MessageCollectionItemType.IGNITE_UUID, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- reqId = reader.readLong("reqId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- resTopicBytes = reader.readByteArray("resTopicBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- sampleClsName = reader.readString("sampleClsName");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- skipStore = reader.readBoolean("skipStore");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- updaterBytes = reader.readByteArray("updaterBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- userVer = reader.readString("userVer");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 62;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 13;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
deleted file mode 100644
index 835e3bd..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/GridDataLoadResponse.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-
-import java.nio.*;
-
-/**
- *
- */
-public class GridDataLoadResponse implements Message {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private long reqId;
-
- /** */
- private byte[] errBytes;
-
- /** */
- private boolean forceLocDep;
-
- /**
- * @param reqId Request ID.
- * @param errBytes Error bytes.
- * @param forceLocDep Force local deployment.
- */
- public GridDataLoadResponse(long reqId, byte[] errBytes, boolean forceLocDep) {
- this.reqId = reqId;
- this.errBytes = errBytes;
- this.forceLocDep = forceLocDep;
- }
-
- /**
- * {@code Externalizable} support.
- */
- public GridDataLoadResponse() {
- // No-op.
- }
-
- /**
- * @return Request ID.
- */
- public long requestId() {
- return reqId;
- }
-
- /**
- * @return Error bytes.
- */
- public byte[] errorBytes() {
- return errBytes;
- }
-
- /**
- * @return {@code True} to force local deployment.
- */
- public boolean forceLocalDeployment() {
- return forceLocDep;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridDataLoadResponse.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType(), fieldsCount()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray("errBytes", errBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeBoolean("forceLocDep", forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong("reqId", reqId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!reader.beforeMessageRead())
- return false;
-
- switch (reader.state()) {
- case 0:
- errBytes = reader.readByteArray("errBytes");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- forceLocDep = reader.readBoolean("forceLocDep");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- reqId = reader.readLong("reqId");
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public byte directType() {
- return 63;
- }
-
- /** {@inheritDoc} */
- @Override public byte fieldsCount() {
- return 3;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
deleted file mode 100644
index 1742041..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerCacheUpdaters.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Bundled factory for cache updaters.
- */
-public class IgniteDataStreamerCacheUpdaters {
- /** */
- private static final IgniteDataStreamer.Updater INDIVIDUAL = new Individual();
-
- /** */
- private static final IgniteDataStreamer.Updater BATCHED = new Batched();
-
- /** */
- private static final IgniteDataStreamer.Updater BATCHED_SORTED = new BatchedSorted();
-
- /**
- * Updates cache using independent {@link org.apache.ignite.cache.GridCache#put(Object, Object, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#remove(Object, org.apache.ignite.lang.IgnitePredicate[])} operations. Thus it is safe from deadlocks but performance
- * is not the best.
- *
- * @return Single updater.
- */
- public static <K, V> IgniteDataStreamer.Updater<K, V> individual() {
- return INDIVIDUAL;
- }
-
- /**
- * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Can cause deadlocks if the same keys are getting
- * updated concurrently. Performance is generally better than in {@link #individual()}.
- *
- * @return Batched updater.
- */
- public static <K, V> IgniteDataStreamer.Updater<K, V> batched() {
- return BATCHED;
- }
-
- /**
- * Updates cache using batched methods {@link org.apache.ignite.cache.GridCache#putAll(Map, org.apache.ignite.lang.IgnitePredicate[])} and
- * {@link org.apache.ignite.cache.GridCache#removeAll(Collection, org.apache.ignite.lang.IgnitePredicate[])}. Keys are sorted in natural order and if all updates
- * use the same rule deadlock can not happen. Performance is generally better than in {@link #individual()}.
- *
- * @return Batched sorted updater.
- */
- public static <K extends Comparable<?>, V> IgniteDataStreamer.Updater<K, V> batchedSorted() {
- return BATCHED_SORTED;
- }
-
- /**
- * Updates cache.
- *
- * @param cache Cache.
- * @param rmvCol Keys to remove.
- * @param putMap Entries to put.
- * @throws IgniteException If failed.
- */
- protected static <K, V> void updateAll(IgniteCache<K, V> cache, @Nullable Set<K> rmvCol,
- Map<K, V> putMap) {
- assert rmvCol != null || putMap != null;
-
- // Here we assume that there are no key duplicates, so the following calls are valid.
- if (rmvCol != null)
- ((IgniteCacheProxy<K, V>)cache).removeAll(rmvCol);
-
- if (putMap != null)
- cache.putAll(putMap);
- }
-
- /**
- * Simple cache updater implementation. Updates keys one by one thus is not dead lock prone.
- */
- private static class Individual<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key != null;
-
- V val = entry.getValue();
-
- if (val == null)
- cache.remove(key);
- else
- cache.put(key, val);
- }
- }
- }
-
- /**
- * Batched updater. Updates cache using batch operations thus is dead lock prone.
- */
- private static class Batched<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- Map<K, V> putAll = null;
- Set<K> rmvAll = null;
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key != null;
-
- V val = entry.getValue();
-
- if (val == null) {
- if (rmvAll == null)
- rmvAll = new HashSet<>();
-
- rmvAll.add(key);
- }
- else {
- if (putAll == null)
- putAll = new HashMap<>();
-
- putAll.put(key, val);
- }
- }
-
- updateAll(cache, rmvAll, putAll);
- }
- }
-
- /**
- * Batched updater. Updates cache using batch operations thus is dead lock prone.
- */
- private static class BatchedSorted<K, V> implements IgniteDataStreamer.Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- assert cache != null;
- assert !F.isEmpty(entries);
-
- Map<K, V> putAll = null;
- Set<K> rmvAll = null;
-
- for (Map.Entry<K, V> entry : entries) {
- K key = entry.getKey();
-
- assert key instanceof Comparable;
-
- V val = entry.getValue();
-
- if (val == null) {
- if (rmvAll == null)
- rmvAll = new TreeSet<>();
-
- rmvAll.add(key);
- }
- else {
- if (putAll == null)
- putAll = new TreeMap<>();
-
- putAll.put(key, val);
- }
- }
-
- updateAll(cache, rmvAll, putAll);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
deleted file mode 100644
index 5730655..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerFuture.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-
-/**
- * Data streamer future.
- */
-class IgniteDataStreamerFuture extends GridFutureAdapter<Object> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Data streamer. */
- @GridToStringExclude
- private IgniteDataStreamerImpl dataLdr;
-
- /**
- * Default constructor for {@link Externalizable} support.
- */
- public IgniteDataStreamerFuture() {
- // No-op.
- }
-
- /**
- * @param ctx Context.
- * @param dataLdr Data streamer.
- */
- IgniteDataStreamerFuture(GridKernalContext ctx, IgniteDataStreamerImpl dataLdr) {
- super(ctx);
-
- assert dataLdr != null;
-
- this.dataLdr = dataLdr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean cancel() throws IgniteCheckedException {
- checkValid();
-
- if (onCancelled()) {
- dataLdr.closeEx(true);
-
- return true;
- }
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteDataStreamerFuture.class, this, super.toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/9c8217c1/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
deleted file mode 100644
index 1231e27..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dataload/IgniteDataStreamerImpl.java
+++ /dev/null
@@ -1,1453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.dataload;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.cluster.*;
-import org.apache.ignite.internal.managers.communication.*;
-import org.apache.ignite.internal.managers.deployment.*;
-import org.apache.ignite.internal.managers.eventstorage.*;
-import org.apache.ignite.internal.processors.affinity.*;
-import org.apache.ignite.internal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.dht.*;
-import org.apache.ignite.internal.processors.cache.version.*;
-import org.apache.ignite.internal.processors.dr.*;
-import org.apache.ignite.internal.processors.portable.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.jdk8.backport.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.Map.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.EventType.*;
-import static org.apache.ignite.internal.GridTopic.*;
-import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
-
-/**
- * Data streamer implementation.
- */
-@SuppressWarnings("unchecked")
-public class IgniteDataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed {
- /** Isolated updater. */
- private static final Updater ISOLATED_UPDATER = new IsolatedUpdater();
-
- /** Cache updater. */
- private Updater<K, V> updater = ISOLATED_UPDATER;
-
- /** */
- private byte[] updaterBytes;
-
- /** Max remap count before issuing an error. */
- private static final int DFLT_MAX_REMAP_CNT = 32;
-
- /** Log reference. */
- private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
- /** Cache name ({@code null} for default cache). */
- private final String cacheName;
-
- /** Portable enabled flag. */
- private final boolean portableEnabled;
-
- /**
- * If {@code true} then data will be transferred in compact format (only keys and values).
- * Otherwise full map entry will be transferred (this is requires by DR internal logic).
- */
- private final boolean compact;
-
- /** Per-node buffer size. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- private int bufSize = DFLT_PER_NODE_BUFFER_SIZE;
-
- /** */
- private int parallelOps = DFLT_MAX_PARALLEL_OPS;
-
- /** */
- private long autoFlushFreq;
-
- /** Mapping. */
- @GridToStringInclude
- private ConcurrentMap<UUID, Buffer> bufMappings = new ConcurrentHashMap8<>();
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Discovery listener. */
- private final GridLocalEventListener discoLsnr;
-
- /** Context. */
- private final GridKernalContext ctx;
-
- /** Communication topic for responses. */
- private final Object topic;
-
- /** */
- private byte[] topicBytes;
-
- /** {@code True} if data streamer has been cancelled. */
- private volatile boolean cancelled;
-
- /** Active futures of this data streamer. */
- @GridToStringInclude
- private final Collection<IgniteInternalFuture<?>> activeFuts = new GridConcurrentHashSet<>();
-
- /** Closure to remove from active futures. */
- @GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<?>> rmvActiveFut = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- boolean rmv = activeFuts.remove(t);
-
- assert rmv;
- }
- };
-
- /** Job peer deploy aware. */
- private volatile GridPeerDeployAware jobPda;
-
- /** Deployment class. */
- private Class<?> depCls;
-
- /** Future to track loading finish. */
- private final GridFutureAdapter<?> fut;
-
- /** Public API future to track loading finish. */
- private final IgniteFuture<?> publicFut;
-
- /** Busy lock. */
- private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
- /** Closed flag. */
- private final AtomicBoolean closed = new AtomicBoolean();
-
- /** */
- private volatile long lastFlushTime = U.currentTimeMillis();
-
- /** */
- private final DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ;
-
- /** */
- private boolean skipStore;
-
- /** */
- private int maxRemapCnt = DFLT_MAX_REMAP_CNT;
-
- /**
- * @param ctx Grid kernal context.
- * @param cacheName Cache name.
- * @param flushQ Flush queue.
- * @param compact If {@code true} data is transferred in compact mode (only keys and values).
- * Otherwise full map entry will be transferred (this is required by DR internal logic).
- */
- public IgniteDataStreamerImpl(
- final GridKernalContext ctx,
- @Nullable final String cacheName,
- DelayQueue<IgniteDataStreamerImpl<K, V>> flushQ,
- boolean compact
- ) {
- assert ctx != null;
-
- this.ctx = ctx;
- this.cacheName = cacheName;
- this.flushQ = flushQ;
- this.compact = compact;
-
- log = U.logger(ctx, logRef, IgniteDataStreamerImpl.class);
-
- ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
- if (node == null)
- throw new IllegalStateException("Cache doesn't exist: " + cacheName);
-
- portableEnabled = ctx.portable().portableEnabled(node, cacheName);
-
- discoLsnr = new GridLocalEventListener() {
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT;
-
- DiscoveryEvent discoEvt = (DiscoveryEvent)evt;
-
- UUID id = discoEvt.eventNode().id();
-
- // Remap regular mappings.
- final Buffer buf = bufMappings.remove(id);
-
- if (buf != null) {
- // Only async notification is possible since
- // discovery thread may be trapped otherwise.
- ctx.closure().callLocalSafe(
- new Callable<Object>() {
- @Override public Object call() throws Exception {
- buf.onNodeLeft();
-
- return null;
- }
- },
- true /* system pool */
- );
- }
- }
- };
-
- ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT);
-
- // Generate unique topic for this loader.
- topic = TOPIC_DATALOAD.topic(IgniteUuid.fromUuid(ctx.localNodeId()));
-
- ctx.io().addMessageListener(topic, new GridMessageListener() {
- @Override public void onMessage(UUID nodeId, Object msg) {
- assert msg instanceof GridDataLoadResponse;
-
- GridDataLoadResponse res = (GridDataLoadResponse)msg;
-
- if (log.isDebugEnabled())
- log.debug("Received data load response: " + res);
-
- Buffer buf = bufMappings.get(nodeId);
-
- if (buf != null)
- buf.onResponse(res);
-
- else if (log.isDebugEnabled())
- log.debug("Ignoring response since node has left [nodeId=" + nodeId + ", ");
- }
- });
-
- if (log.isDebugEnabled())
- log.debug("Added response listener within topic: " + topic);
-
- fut = new IgniteDataStreamerFuture(ctx, this);
-
- publicFut = new IgniteFutureImpl<>(fut);
- }
-
- /**
- * Enters busy lock.
- */
- private void enterBusy() {
- if (!busyLock.enterBusy())
- throw new IllegalStateException("Data streamer has been closed.");
- }
-
- /**
- * Leaves busy lock.
- */
- private void leaveBusy() {
- busyLock.leaveBusy();
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> future() {
- return publicFut;
- }
-
- /**
- * @return Internal future.
- */
- public IgniteInternalFuture<?> internalFuture() {
- return fut;
- }
-
- /** {@inheritDoc} */
- @Override public void deployClass(Class<?> depCls) {
- this.depCls = depCls;
- }
-
- /** {@inheritDoc} */
- @Override public void updater(Updater<K, V> updater) {
- A.notNull(updater, "updater");
-
- this.updater = updater;
- }
-
- /** {@inheritDoc} */
- @Override public boolean allowOverwrite() {
- return updater != ISOLATED_UPDATER;
- }
-
- /** {@inheritDoc} */
- @Override public void allowOverwrite(boolean allow) {
- if (allow == allowOverwrite())
- return;
-
- ClusterNode node = F.first(ctx.grid().cluster().forCacheNodes(cacheName).nodes());
-
- if (node == null)
- throw new IgniteException("Failed to get node for cache: " + cacheName);
-
- updater = allow ? IgniteDataStreamerCacheUpdaters.<K, V>individual() : ISOLATED_UPDATER;
- }
-
- /** {@inheritDoc} */
- @Override public boolean skipStore() {
- return skipStore;
- }
-
- /** {@inheritDoc} */
- @Override public void skipStore(boolean skipStore) {
- this.skipStore = skipStore;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public String cacheName() {
- return cacheName;
- }
-
- /** {@inheritDoc} */
- @Override public int perNodeBufferSize() {
- return bufSize;
- }
-
- /** {@inheritDoc} */
- @Override public void perNodeBufferSize(int bufSize) {
- A.ensure(bufSize > 0, "bufSize > 0");
-
- this.bufSize = bufSize;
- }
-
- /** {@inheritDoc} */
- @Override public int perNodeParallelLoadOperations() {
- return parallelOps;
- }
-
- /** {@inheritDoc} */
- @Override public void perNodeParallelLoadOperations(int parallelOps) {
- this.parallelOps = parallelOps;
- }
-
- /** {@inheritDoc} */
- @Override public long autoFlushFrequency() {
- return autoFlushFreq;
- }
-
- /** {@inheritDoc} */
- @Override public void autoFlushFrequency(long autoFlushFreq) {
- A.ensure(autoFlushFreq >= 0, "autoFlushFreq >= 0");
-
- long old = this.autoFlushFreq;
-
- if (autoFlushFreq != old) {
- this.autoFlushFreq = autoFlushFreq;
-
- if (autoFlushFreq != 0 && old == 0)
- flushQ.add(this);
- else if (autoFlushFreq == 0)
- flushQ.remove(this);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
- A.notNull(entries, "entries");
-
- return addData(entries.entrySet());
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries) {
- A.notEmpty(entries, "entries");
-
- enterBusy();
-
- try {
- GridFutureAdapter<Object> resFut = new GridFutureAdapter<>(ctx);
-
- resFut.listenAsync(rmvActiveFut);
-
- activeFuts.add(resFut);
-
- Collection<K> keys = null;
-
- if (entries.size() > 1) {
- keys = new GridConcurrentHashSet<>(entries.size(), U.capacity(entries.size()), 1);
-
- for (Map.Entry<K, V> entry : entries)
- keys.add(entry.getKey());
- }
-
- load0(entries, resFut, keys, 0);
-
- return new IgniteFutureImpl<>(resFut);
- }
- catch (IgniteException e) {
- return new IgniteFinishedFutureImpl<>(ctx, e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) {
- A.notNull(entry, "entry");
-
- return addData(F.asList(entry));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> addData(K key, V val) {
- A.notNull(key, "key");
-
- return addData(new Entry0<>(key, val));
- }
-
- /** {@inheritDoc} */
- @Override public IgniteFuture<?> removeData(K key) {
- return addData(key, null);
- }
-
- /**
- * @param entries Entries.
- * @param resFut Result future.
- * @param activeKeys Active keys.
- * @param remaps Remaps count.
- */
- private void load0(
- Collection<? extends Map.Entry<K, V>> entries,
- final GridFutureAdapter<Object> resFut,
- @Nullable final Collection<K> activeKeys,
- final int remaps
- ) {
- assert entries != null;
-
- Map<ClusterNode, Collection<Map.Entry<K, V>>> mappings = new HashMap<>();
-
- boolean initPda = ctx.deploy().enabled() && jobPda == null;
-
- for (Map.Entry<K, V> entry : entries) {
- List<ClusterNode> nodes;
-
- try {
- K key = entry.getKey();
-
- assert key != null;
-
- if (initPda) {
- jobPda = new DataStreamerPda(key, entry.getValue(), updater);
-
- initPda = false;
- }
-
- nodes = nodes(key);
- }
- catch (IgniteCheckedException e) {
- resFut.onDone(e);
-
- return;
- }
-
- if (F.isEmpty(nodes)) {
- resFut.onDone(new ClusterTopologyException("Failed to map key to node " +
- "(no nodes with cache found in topology) [infos=" + entries.size() +
- ", cacheName=" + cacheName + ']'));
-
- return;
- }
-
- for (ClusterNode node : nodes) {
- Collection<Map.Entry<K, V>> col = mappings.get(node);
-
- if (col == null)
- mappings.put(node, col = new ArrayList<>());
-
- col.add(entry);
- }
- }
-
- for (final Map.Entry<ClusterNode, Collection<Map.Entry<K, V>>> e : mappings.entrySet()) {
- final UUID nodeId = e.getKey().id();
-
- Buffer buf = bufMappings.get(nodeId);
-
- if (buf == null) {
- Buffer old = bufMappings.putIfAbsent(nodeId, buf = new Buffer(e.getKey()));
-
- if (old != null)
- buf = old;
- }
-
- final Collection<Map.Entry<K, V>> entriesForNode = e.getValue();
-
- IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> t) {
- try {
- t.get();
-
- if (activeKeys != null) {
- for (Map.Entry<K, V> e : entriesForNode)
- activeKeys.remove(e.getKey());
-
- if (activeKeys.isEmpty())
- resFut.onDone();
- }
- else {
- assert entriesForNode.size() == 1;
-
- // That has been a single key,
- // so complete result future right away.
- resFut.onDone();
- }
- }
- catch (IgniteCheckedException e1) {
- if (log.isDebugEnabled())
- log.debug("Future finished with error [nodeId=" + nodeId + ", err=" + e1 + ']');
-
- if (cancelled) {
- resFut.onDone(new IgniteCheckedException("Data streamer has been cancelled: " +
- IgniteDataStreamerImpl.this, e1));
- }
- else if (remaps + 1 > maxRemapCnt) {
- resFut.onDone(new IgniteCheckedException("Failed to finish operation (too many remaps): "
- + remaps), e1);
- }
- else
- load0(entriesForNode, resFut, activeKeys, remaps + 1);
- }
- }
- };
-
- GridFutureAdapter<?> f;
-
- try {
- f = buf.update(entriesForNode, lsnr);
- }
- catch (IgniteInterruptedCheckedException e1) {
- resFut.onDone(e1);
-
- return;
- }
-
- if (ctx.discovery().node(nodeId) == null) {
- if (bufMappings.remove(nodeId, buf))
- buf.onNodeLeft();
-
- if (f != null)
- f.onDone(new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + nodeId));
- }
- }
- }
-
- /**
- * @param key Key to map.
- * @return Nodes to send requests to.
- * @throws IgniteCheckedException If failed.
- */
- private List<ClusterNode> nodes(K key) throws IgniteCheckedException {
- GridAffinityProcessor aff = ctx.affinity();
-
- return !allowOverwrite() ? aff.mapKeyToPrimaryAndBackups(cacheName, key) :
- Collections.singletonList(aff.mapKeyToNode(cacheName, key));
- }
-
- /**
- * Performs flush.
- *
- * @throws IgniteCheckedException If failed.
- */
- private void doFlush() throws IgniteCheckedException {
- lastFlushTime = U.currentTimeMillis();
-
- List<IgniteInternalFuture> activeFuts0 = null;
-
- int doneCnt = 0;
-
- for (IgniteInternalFuture<?> f : activeFuts) {
- if (!f.isDone()) {
- if (activeFuts0 == null)
- activeFuts0 = new ArrayList<>((int)(activeFuts.size() * 1.2));
-
- activeFuts0.add(f);
- }
- else {
- f.get();
-
- doneCnt++;
- }
- }
-
- if (activeFuts0 == null || activeFuts0.isEmpty())
- return;
-
- while (true) {
- Queue<IgniteInternalFuture<?>> q = null;
-
- for (Buffer buf : bufMappings.values()) {
- IgniteInternalFuture<?> flushFut = buf.flush();
-
- if (flushFut != null) {
- if (q == null)
- q = new ArrayDeque<>(bufMappings.size() * 2);
-
- q.add(flushFut);
- }
- }
-
- if (q != null) {
- assert !q.isEmpty();
-
- boolean err = false;
-
- for (IgniteInternalFuture fut = q.poll(); fut != null; fut = q.poll()) {
- try {
- fut.get();
- }
- catch (IgniteCheckedException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to flush buffer: " + e);
-
- err = true;
- }
- }
-
- if (err)
- // Remaps needed - flush buffers.
- continue;
- }
-
- doneCnt = 0;
-
- for (int i = 0; i < activeFuts0.size(); i++) {
- IgniteInternalFuture f = activeFuts0.get(i);
-
- if (f == null)
- doneCnt++;
- else if (f.isDone()) {
- f.get();
-
- doneCnt++;
-
- activeFuts0.set(i, null);
- }
- else
- break;
- }
-
- if (doneCnt == activeFuts0.size())
- return;
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("ForLoopReplaceableByForEach")
- @Override public void flush() throws IgniteException {
- enterBusy();
-
- try {
- doFlush();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * Flushes every internal buffer if buffer was flushed before passed in
- * threshold.
- * <p>
- * Does not wait for result and does not fail on errors assuming that this method
- * should be called periodically.
- */
- @Override public void tryFlush() throws IgniteInterruptedException {
- if (!busyLock.enterBusy())
- return;
-
- try {
- for (Buffer buf : bufMappings.values())
- buf.flush();
-
- lastFlushTime = U.currentTimeMillis();
- }
- catch (IgniteInterruptedCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- leaveBusy();
- }
- }
-
- /**
- * @param cancel {@code True} to close with cancellation.
- * @throws IgniteException If failed.
- */
- @Override public void close(boolean cancel) throws IgniteException {
- try {
- closeEx(cancel);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * @param cancel {@code True} to close with cancellation.
- * @throws IgniteCheckedException If failed.
- */
- public void closeEx(boolean cancel) throws IgniteCheckedException {
- if (!closed.compareAndSet(false, true))
- return;
-
- busyLock.block();
-
- if (log.isDebugEnabled())
- log.debug("Closing data streamer [ldr=" + this + ", cancel=" + cancel + ']');
-
- IgniteCheckedException e = null;
-
- try {
- // Assuming that no methods are called on this loader after this method is called.
- if (cancel) {
- cancelled = true;
-
- for (Buffer buf : bufMappings.values())
- buf.cancelAll();
- }
- else
- doFlush();
-
- ctx.event().removeLocalEventListener(discoLsnr);
-
- ctx.io().removeMessageListener(topic);
- }
- catch (IgniteCheckedException e0) {
- e = e0;
- }
-
- fut.onDone(null, e);
-
- if (e != null)
- throw e;
- }
-
- /**
- * @return {@code true} If the loader is closed.
- */
- boolean isClosed() {
- return fut.isDone();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IgniteException {
- close(false);
- }
-
- /**
- * @return Max remap count.
- */
- public int maxRemapCount() {
- return maxRemapCnt;
- }
-
- /**
- * @param maxRemapCnt New max remap count.
- */
- public void maxRemapCount(int maxRemapCnt) {
- this.maxRemapCnt = maxRemapCnt;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgniteDataStreamerImpl.class, this);
- }
-
- /** {@inheritDoc} */
- @Override public long getDelay(TimeUnit unit) {
- return unit.convert(nextFlushTime() - U.currentTimeMillis(), TimeUnit.MILLISECONDS);
- }
-
- /**
- * @return Next flush time.
- */
- private long nextFlushTime() {
- return lastFlushTime + autoFlushFreq;
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(Delayed o) {
- return nextFlushTime() > ((IgniteDataStreamerImpl)o).nextFlushTime() ? 1 : -1;
- }
-
- /**
- *
- */
- private class Buffer {
- /** Node. */
- private final ClusterNode node;
-
- /** Active futures. */
- private final Collection<IgniteInternalFuture<Object>> locFuts;
-
- /** Buffered entries. */
- private List<Map.Entry<K, V>> entries;
-
- /** */
- @GridToStringExclude
- private GridFutureAdapter<Object> curFut;
-
- /** Local node flag. */
- private final boolean isLocNode;
-
- /** ID generator. */
- private final AtomicLong idGen = new AtomicLong();
-
- /** Active futures. */
- private final ConcurrentMap<Long, GridFutureAdapter<Object>> reqs;
-
- /** */
- private final Semaphore sem;
-
- /** Closure to signal on task finish. */
- @GridToStringExclude
- private final IgniteInClosure<IgniteInternalFuture<Object>> signalC = new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- signalTaskFinished(t);
- }
- };
-
- /**
- * @param node Node.
- */
- Buffer(ClusterNode node) {
- assert node != null;
-
- this.node = node;
-
- locFuts = new GridConcurrentHashSet<>();
- reqs = new ConcurrentHashMap8<>();
-
- // Cache local node flag.
- isLocNode = node.equals(ctx.discovery().localNode());
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
-
- sem = new Semaphore(parallelOps);
- }
-
- /**
- * @param newEntries Infos.
- * @param lsnr Listener for the operation future.
- * @throws IgniteInterruptedCheckedException If failed.
- * @return Future for operation.
- */
- @Nullable GridFutureAdapter<?> update(Iterable<Map.Entry<K, V>> newEntries,
- IgniteInClosure<IgniteInternalFuture<?>> lsnr) throws IgniteInterruptedCheckedException {
- List<Map.Entry<K, V>> entries0 = null;
- GridFutureAdapter<Object> curFut0;
-
- synchronized (this) {
- curFut0 = curFut;
-
- curFut0.listenAsync(lsnr);
-
- for (Map.Entry<K, V> entry : newEntries)
- entries.add(entry);
-
- if (entries.size() >= bufSize) {
- entries0 = entries;
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
- }
- }
-
- if (entries0 != null) {
- submit(entries0, curFut0);
-
- if (cancelled)
- curFut0.onDone(new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this));
- }
-
- return curFut0;
- }
-
- /**
- * @return Fresh collection with some space for outgrowth.
- */
- private List<Map.Entry<K, V>> newEntries() {
- return new ArrayList<>((int)(bufSize * 1.2));
- }
-
- /**
- * @return Future if any submitted.
- *
- * @throws IgniteInterruptedCheckedException If thread has been interrupted.
- */
- @Nullable IgniteInternalFuture<?> flush() throws IgniteInterruptedCheckedException {
- List<Map.Entry<K, V>> entries0 = null;
- GridFutureAdapter<Object> curFut0 = null;
-
- synchronized (this) {
- if (!entries.isEmpty()) {
- entries0 = entries;
- curFut0 = curFut;
-
- entries = newEntries();
- curFut = new GridFutureAdapter<>(ctx);
- curFut.listenAsync(signalC);
- }
- }
-
- if (entries0 != null)
- submit(entries0, curFut0);
-
- // Create compound future for this flush.
- GridCompoundFuture<Object, Object> res = null;
-
- for (IgniteInternalFuture<Object> f : locFuts) {
- if (res == null)
- res = new GridCompoundFuture<>(ctx);
-
- res.add(f);
- }
-
- for (IgniteInternalFuture<Object> f : reqs.values()) {
- if (res == null)
- res = new GridCompoundFuture<>(ctx);
-
- res.add(f);
- }
-
- if (res != null)
- res.markInitialized();
-
- return res;
- }
-
- /**
- * Increments active tasks count.
- *
- * @throws IgniteInterruptedCheckedException If thread has been interrupted.
- */
- private void incrementActiveTasks() throws IgniteInterruptedCheckedException {
- U.acquire(sem);
- }
-
- /**
- * @param f Future that finished.
- */
- private void signalTaskFinished(IgniteInternalFuture<Object> f) {
- assert f != null;
-
- sem.release();
- }
-
- /**
- * @param entries Entries to submit.
- * @param curFut Current future.
- * @throws IgniteInterruptedCheckedException If interrupted.
- */
- private void submit(final Collection<Map.Entry<K, V>> entries, final GridFutureAdapter<Object> curFut)
- throws IgniteInterruptedCheckedException {
- assert entries != null;
- assert !entries.isEmpty();
- assert curFut != null;
-
- incrementActiveTasks();
-
- IgniteInternalFuture<Object> fut;
-
- if (isLocNode) {
- fut = ctx.closure().callLocalSafe(
- new IgniteDataStreamerUpdateJob<>(ctx, log, cacheName, entries, false, skipStore, updater), false);
-
- locFuts.add(fut);
-
- fut.listenAsync(new IgniteInClosure<IgniteInternalFuture<Object>>() {
- @Override public void apply(IgniteInternalFuture<Object> t) {
- try {
- boolean rmv = locFuts.remove(t);
-
- assert rmv;
-
- curFut.onDone(t.get());
- }
- catch (IgniteCheckedException e) {
- curFut.onDone(e);
- }
- }
- });
- }
- else {
- byte[] entriesBytes;
-
- try {
- if (compact) {
- entriesBytes = ctx.config().getMarshaller()
- .marshal(new Entries0<>(entries, portableEnabled ? ctx.portable() : null));
- }
- else
- entriesBytes = ctx.config().getMarshaller().marshal(entries);
-
- if (updaterBytes == null) {
- assert updater != null;
-
- updaterBytes = ctx.config().getMarshaller().marshal(updater);
- }
-
- if (topicBytes == null)
- topicBytes = ctx.config().getMarshaller().marshal(topic);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal (request will not be sent).", e);
-
- return;
- }
-
- GridDeployment dep = null;
- GridPeerDeployAware jobPda0 = null;
-
- if (ctx.deploy().enabled()) {
- try {
- jobPda0 = jobPda;
-
- assert jobPda0 != null;
-
- dep = ctx.deploy().deploy(jobPda0.deployClass(), jobPda0.classLoader());
-
- GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
-
- if (cache != null)
- cache.context().deploy().onEnter();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to deploy class (request will not be sent): " + jobPda0.deployClass(), e);
-
- return;
- }
-
- if (dep == null)
- U.warn(log, "Failed to deploy class (request will be sent): " + jobPda0.deployClass());
- }
-
- long reqId = idGen.incrementAndGet();
-
- fut = curFut;
-
- reqs.put(reqId, (GridFutureAdapter<Object>)fut);
-
- GridDataLoadRequest req = new GridDataLoadRequest(
- reqId,
- topicBytes,
- cacheName,
- updaterBytes,
- entriesBytes,
- true,
- skipStore,
- dep != null ? dep.deployMode() : null,
- dep != null ? jobPda0.deployClass().getName() : null,
- dep != null ? dep.userVersion() : null,
- dep != null ? dep.participants() : null,
- dep != null ? dep.classLoaderId() : null,
- dep == null);
-
- try {
- ctx.io().send(node, TOPIC_DATALOAD, req, PUBLIC_POOL);
-
- if (log.isDebugEnabled())
- log.debug("Sent request to node [nodeId=" + node.id() + ", req=" + req + ']');
- }
- catch (IgniteCheckedException e) {
- if (ctx.discovery().alive(node) && ctx.discovery().pingNode(node.id()))
- ((GridFutureAdapter<Object>)fut).onDone(e);
- else
- ((GridFutureAdapter<Object>)fut).onDone(new ClusterTopologyCheckedException("Failed to send " +
- "request (node has left): " + node.id()));
- }
- }
- }
-
- /**
- *
- */
- void onNodeLeft() {
- assert !isLocNode;
- assert bufMappings.get(node.id()) != this;
-
- if (log.isDebugEnabled())
- log.debug("Forcibly completing futures (node has left): " + node.id());
-
- Exception e = new ClusterTopologyCheckedException("Failed to wait for request completion " +
- "(node has left): " + node.id());
-
- for (GridFutureAdapter<Object> f : reqs.values())
- f.onDone(e);
-
- // Make sure to complete current future.
- GridFutureAdapter<Object> curFut0;
-
- synchronized (this) {
- curFut0 = curFut;
- }
-
- curFut0.onDone(e);
- }
-
- /**
- * @param res Response.
- */
- void onResponse(GridDataLoadResponse res) {
- if (log.isDebugEnabled())
- log.debug("Received data load response: " + res);
-
- GridFutureAdapter<?> f = reqs.remove(res.requestId());
-
- if (f == null) {
- if (log.isDebugEnabled())
- log.debug("Future for request has not been found: " + res.requestId());
-
- return;
- }
-
- Throwable err = null;
-
- byte[] errBytes = res.errorBytes();
-
- if (errBytes != null) {
- try {
- GridPeerDeployAware jobPda0 = jobPda;
-
- err = ctx.config().getMarshaller().unmarshal(
- errBytes,
- jobPda0 != null ? jobPda0.classLoader() : U.gridClassLoader());
- }
- catch (IgniteCheckedException e) {
- f.onDone(null, new IgniteCheckedException("Failed to unmarshal response.", e));
-
- return;
- }
- }
-
- f.onDone(null, err);
-
- if (log.isDebugEnabled())
- log.debug("Finished future [fut=" + f + ", reqId=" + res.requestId() + ", err=" + err + ']');
- }
-
- /**
- *
- */
- void cancelAll() {
- IgniteCheckedException err = new IgniteCheckedException("Data streamer has been cancelled: " + IgniteDataStreamerImpl.this);
-
- for (IgniteInternalFuture<?> f : locFuts) {
- try {
- f.cancel();
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to cancel mini-future.", e);
- }
- }
-
- for (GridFutureAdapter<?> f : reqs.values())
- f.onDone(err);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- int size;
-
- synchronized (this) {
- size = entries.size();
- }
-
- return S.toString(Buffer.class, this,
- "entriesCnt", size,
- "locFutsSize", locFuts.size(),
- "reqsSize", reqs.size());
- }
- }
-
- /**
- * Data streamer peer-deploy aware.
- */
- private class DataStreamerPda implements GridPeerDeployAware {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Deploy class. */
- private Class<?> cls;
-
- /** Class loader. */
- private ClassLoader ldr;
-
- /** Collection of objects to detect deploy class and class loader. */
- private Collection<Object> objs;
-
- /**
- * Constructs data streamer peer-deploy aware.
- *
- * @param objs Collection of objects to detect deploy class and class loader.
- */
- private DataStreamerPda(Object... objs) {
- this.objs = Arrays.asList(objs);
- }
-
- /** {@inheritDoc} */
- @Override public Class<?> deployClass() {
- if (cls == null) {
- Class<?> cls0 = null;
-
- if (depCls != null)
- cls0 = depCls;
- else {
- for (Iterator<Object> it = objs.iterator(); (cls0 == null || U.isJdk(cls0)) && it.hasNext();) {
- Object o = it.next();
-
- if (o != null)
- cls0 = U.detectClass(o);
- }
-
- if (cls0 == null || U.isJdk(cls0))
- cls0 = IgniteDataStreamerImpl.class;
- }
-
- assert cls0 != null : "Failed to detect deploy class [objs=" + objs + ']';
-
- cls = cls0;
- }
-
- return cls;
- }
-
- /** {@inheritDoc} */
- @Override public ClassLoader classLoader() {
- if (ldr == null) {
- ClassLoader ldr0 = deployClass().getClassLoader();
-
- // Safety.
- if (ldr0 == null)
- ldr0 = U.gridClassLoader();
-
- assert ldr0 != null : "Failed to detect classloader [objs=" + objs + ']';
-
- ldr = ldr0;
- }
-
- return ldr;
- }
- }
-
- /**
- * Entry.
- */
- private static class Entry0<K, V> implements Map.Entry<K, V>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private K key;
-
- /** */
- private V val;
-
- /**
- * @param key Key.
- * @param val Value.
- */
- private Entry0(K key, @Nullable V val) {
- assert key != null;
-
- this.key = key;
- this.val = val;
- }
-
- /**
- * For {@link Externalizable}.
- */
- @SuppressWarnings("UnusedDeclaration")
- public Entry0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public K getKey() {
- return key;
- }
-
- /** {@inheritDoc} */
- @Override public V getValue() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public V setValue(V val) {
- V old = this.val;
-
- this.val = val;
-
- return old;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(key);
- out.writeObject(val);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- key = (K)in.readObject();
- val = (V)in.readObject();
- }
- }
-
- /**
- * Wrapper list with special compact serialization of map entries.
- */
- private static class Entries0<K, V> extends AbstractCollection<Map.Entry<K, V>> implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Wrapped delegate. */
- private Collection<Map.Entry<K, V>> delegate;
-
- /** Optional portable processor for converting values. */
- private GridPortableProcessor portable;
-
- /**
- * @param delegate Delegate.
- * @param portable Portable processor.
- */
- private Entries0(Collection<Map.Entry<K, V>> delegate, GridPortableProcessor portable) {
- this.delegate = delegate;
- this.portable = portable;
- }
-
- /**
- * For {@link Externalizable}.
- */
- public Entries0() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public Iterator<Entry<K, V>> iterator() {
- return delegate.iterator();
- }
-
- /** {@inheritDoc} */
- @Override public int size() {
- return delegate.size();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(delegate.size());
-
- boolean portableEnabled = portable != null;
-
- for (Map.Entry<K, V> entry : delegate) {
- if (portableEnabled) {
- out.writeObject(portable.marshalToPortable(entry.getKey()));
- out.writeObject(portable.marshalToPortable(entry.getValue()));
- }
- else {
- out.writeObject(entry.getKey());
- out.writeObject(entry.getValue());
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int sz = in.readInt();
-
- delegate = new ArrayList<>(sz);
-
- for (int i = 0; i < sz; i++) {
- Object k = in.readObject();
- Object v = in.readObject();
-
- delegate.add(new Entry0<>((K)k, (V)v));
- }
- }
- }
-
- /**
- * Isolated updater which only loads entry initial value.
- */
- private static class IsolatedUpdater<K, V> implements Updater<K, V> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** {@inheritDoc} */
- @Override public void update(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) {
- IgniteCacheProxy<K, V> proxy = (IgniteCacheProxy<K, V>)cache;
-
- GridCacheAdapter<K, V> internalCache = proxy.context().cache();
-
- if (internalCache.isNear())
- internalCache = internalCache.context().near().dht();
-
- GridCacheContext<K, V> cctx = internalCache.context();
-
- long topVer = cctx.affinity().affinityTopologyVersion();
-
- GridCacheVersion ver = cctx.versions().next(topVer);
-
- boolean portable = cctx.portableEnabled();
-
- for (Map.Entry<K, V> e : entries) {
- try {
- K key = e.getKey();
- V val = e.getValue();
-
- if (portable) {
- key = (K)cctx.marshalToPortable(key);
- val = (V)cctx.marshalToPortable(val);
- }
-
- GridCacheEntryEx<K, V> entry = internalCache.entryEx(key, topVer);
-
- entry.unswap(true, false);
-
- entry.initialValue(val, null, ver, CU.TTL_ETERNAL, CU.EXPIRE_TIME_ETERNAL, false, topVer,
- GridDrType.DR_LOAD);
-
- cctx.evicts().touch(entry, topVer);
- }
- catch (GridDhtInvalidPartitionException | GridCacheEntryRemovedException ignored) {
- // No-op.
- }
- catch (IgniteCheckedException ex) {
- IgniteLogger log = cache.unwrap(Ignite.class).log();
-
- U.error(log, "Failed to set initial value for cache entry: " + e, ex);
- }
- }
- }
- }
-}