You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 13:13:44 UTC
ignite git commit: IGNITE-1317: Moved platform cache to Ignite.
Repository: ignite
Updated Branches:
refs/heads/ignite-1317 [created] 4d650ded7
IGNITE-1317: Moved platform cache to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d650ded
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d650ded
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d650ded
Branch: refs/heads/ignite-1317
Commit: 4d650ded767e901e43abe098f48b01df94298398
Parents: 26f0ee0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 14:14:18 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 14:14:18 2015 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 22 +-
.../cache/PlatformCacheEntryFilter.java | 29 +
.../platform/cache/PlatformCache.java | 1056 ++++++++++++++++++
.../cache/PlatformCacheEntryFilterImpl.java | 105 ++
.../cache/PlatformCacheEntryProcessor.java | 212 ++++
5 files changed, 1423 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 5275e0d..cbcc91b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -18,12 +18,14 @@
package org.apache.ignite.internal.processors.platform;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.events.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.communication.*;
import org.apache.ignite.internal.portable.*;
import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.cache.*;
import org.apache.ignite.internal.processors.platform.cache.query.*;
import org.apache.ignite.internal.processors.platform.callback.*;
import org.apache.ignite.internal.processors.platform.compute.*;
@@ -223,10 +225,28 @@ public interface PlatformContext {
/**
* Create closure job.
*
- * @param task Task.
+ * @param task Native task.
* @param ptr Pointer.
* @param job Native job.
* @return Closure job.
*/
public PlatformJob createClosureJob(Object task, long ptr, Object job);
+
+ /**
+ * Create cache entry processor.
+ *
+ * @param proc Native processor.
+ * @param ptr Pointer.
+ * @return Entry processor.
+ */
+ public CacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr);
+
+ /**
+ * Create cache entry filter.
+ *
+ * @param filter Native filter.
+ * @param ptr Pointer.
+ * @return Entry filter.
+ */
+ public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java
new file mode 100644
index 0000000..ac7cba4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+
+/**
+ * Platform cache entry filter interface.
+ */
+public interface PlatformCacheEntryFilter<K, V> extends GridLoadCacheCloseablePredicate<K, V>,
+ CacheQueryCloseableScanBiPredicate<K, V> {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
new file mode 100644
index 0000000..dff9d67
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -0,0 +1,1056 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.cache.query.*;
+import org.apache.ignite.internal.processors.platform.compute.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Native cache wrapper implementation.
+ */
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+public class PlatformCache extends PlatformAbstractTarget {
+ /** */
+ public static final int OP_CLEAR = 1;
+
+ /** */
+ public static final int OP_CLEAR_ALL = 2;
+
+ /** */
+ public static final int OP_CONTAINS_KEY = 3;
+
+ /** */
+ public static final int OP_CONTAINS_KEYS = 4;
+
+ /** */
+ public static final int OP_GET = 5;
+
+ /** */
+ public static final int OP_GET_ALL = 6;
+
+ /** */
+ public static final int OP_GET_AND_PUT = 7;
+
+ /** */
+ public static final int OP_GET_AND_PUT_IF_ABSENT = 8;
+
+ /** */
+ public static final int OP_GET_AND_REMOVE = 9;
+
+ /** */
+ public static final int OP_GET_AND_REPLACE = 10;
+
+ /** */
+ public static final int OP_GET_NAME = 11;
+
+ /** */
+ public static final int OP_INVOKE = 12;
+
+ /** */
+ public static final int OP_INVOKE_ALL = 13;
+
+ /** */
+ public static final int OP_IS_LOCAL_LOCKED = 14;
+
+ /** */
+ public static final int OP_LOAD_CACHE = 15;
+
+ /** */
+ public static final int OP_LOC_EVICT = 16;
+
+ /** */
+ public static final int OP_LOC_LOAD_CACHE = 17;
+
+ /** */
+ public static final int OP_LOC_PROMOTE = 18;
+
+ /** */
+ public static final int OP_LOCAL_CLEAR = 20;
+
+ /** */
+ public static final int OP_LOCAL_CLEAR_ALL = 21;
+
+ /** */
+ public static final int OP_LOCK = 22;
+
+ /** */
+ public static final int OP_LOCK_ALL = 23;
+
+ /** */
+ public static final int OP_METRICS = 24;
+
+ /** */
+ private static final int OP_PEEK = 25;
+
+ /** */
+ private static final int OP_PUT = 26;
+
+ /** */
+ private static final int OP_PUT_ALL = 27;
+
+ /** */
+ public static final int OP_PUT_IF_ABSENT = 28;
+
+ /** */
+ public static final int OP_QRY_CONTINUOUS = 29;
+
+ /** */
+ public static final int OP_QRY_SCAN = 30;
+
+ /** */
+ public static final int OP_QRY_SQL = 31;
+
+ /** */
+ public static final int OP_QRY_SQL_FIELDS = 32;
+
+ /** */
+ public static final int OP_QRY_TXT = 33;
+
+ /** */
+ public static final int OP_REMOVE_ALL = 34;
+
+ /** */
+ public static final int OP_REMOVE_BOOL = 35;
+
+ /** */
+ public static final int OP_REMOVE_OBJ = 36;
+
+ /** */
+ public static final int OP_REPLACE_2 = 37;
+
+ /** */
+ public static final int OP_REPLACE_3 = 38;
+
+ /** Underlying JCache. */
+ private final IgniteCacheProxy cache;
+
+ /** Whether this cache is created with "keepPortable" flag on the other side. */
+ private final boolean keepPortable;
+
+ /** */
+ private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter();
+
+ /** */
+ private static final EntryProcessorExceptionWriter WRITER_PROC_ERR = new EntryProcessorExceptionWriter();
+
+ /** */
+ private static final EntryProcessorResultsWriter WRITER_INVOKE_ALL = new EntryProcessorResultsWriter();
+
+ /** Map with currently active locks. */
+ private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap();
+
+ /** Lock ID sequence. */
+ private static final AtomicLong LOCK_ID_GEN = new AtomicLong();
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param cache Underlying cache.
+ * @param keepPortable Keep portable flag.
+ */
+ public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) {
+ super(platformCtx);
+
+ this.cache = (IgniteCacheProxy)cache;
+ this.keepPortable = keepPortable;
+ }
+
+ /**
+ * Gets cache with "skip-store" flag set.
+ *
+ * @return Cache with "skip-store" flag set.
+ */
+ public PlatformCache withSkipStore() {
+ if (cache.delegate().skipStore())
+ return this;
+
+ return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable);
+ }
+
+ /**
+ * Gets cache with "keep portable" flag.
+ *
+ * @return Cache with "keep portable" flag set.
+ */
+ public PlatformCache withKeepPortable() {
+ if (keepPortable)
+ return this;
+
+ return new PlatformCache(platformCtx, cache.withSkipStore(), true);
+ }
+
+ /**
+ * Gets cache with provided expiry policy.
+ *
+ * @param create Create.
+ * @param update Update.
+ * @param access Access.
+ * @return Cache.
+ */
+ public PlatformCache withExpiryPolicy(final long create, final long update, final long access) {
+ IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access));
+
+ return new PlatformCache(platformCtx, cache0, keepPortable);
+ }
+
+ /**
+ * Gets cache with asynchronous mode enabled.
+ *
+ * @return Cache with asynchronous mode enabled.
+ */
+ public PlatformCache withAsync() {
+ if (cache.isAsync())
+ return this;
+
+ return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable);
+ }
+
+ /**
+ * Gets cache with no-retries mode enabled.
+ *
+ * @return Cache with no-retries mode enabled.
+ */
+ public PlatformCache withNoRetries() {
+ CacheOperationContext opCtx = cache.operationContext();
+
+ if (opCtx != null && opCtx.noRetries())
+ return this;
+
+ return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_PUT:
+ cache.put(reader.readObjectDetached(), reader.readObjectDetached());
+
+ return TRUE;
+
+ case OP_REMOVE_BOOL:
+ return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+
+ case OP_REMOVE_ALL:
+ cache.removeAll(PlatformUtils.readSet(reader));
+
+ return TRUE;
+
+ case OP_PUT_ALL:
+ cache.putAll(PlatformUtils.readMap(reader));
+
+ return TRUE;
+
+ case OP_LOC_EVICT:
+ cache.localEvict(PlatformUtils.readCollection(reader));
+
+ return TRUE;
+
+ case OP_CONTAINS_KEY:
+ return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE;
+
+ case OP_CONTAINS_KEYS:
+ return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE;
+
+ case OP_LOC_PROMOTE: {
+ cache.localPromote(PlatformUtils.readSet(reader));
+
+ break;
+ }
+
+ case OP_REPLACE_3:
+ return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(),
+ reader.readObjectDetached()) ? TRUE : FALSE;
+
+ case OP_LOC_LOAD_CACHE:
+ loadCache0(reader, true);
+
+ break;
+
+ case OP_LOAD_CACHE:
+ loadCache0(reader, false);
+
+ break;
+
+ case OP_CLEAR:
+ cache.clear(reader.readObjectDetached());
+
+ break;
+
+ case OP_CLEAR_ALL:
+ cache.clearAll(PlatformUtils.readSet(reader));
+
+ break;
+
+ case OP_LOCAL_CLEAR:
+ cache.localClear(reader.readObjectDetached());
+
+ break;
+
+ case OP_LOCAL_CLEAR_ALL:
+ cache.localClearAll(PlatformUtils.readSet(reader));
+
+ break;
+
+ case OP_PUT_IF_ABSENT: {
+ return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+ }
+
+ case OP_REPLACE_2: {
+ return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+ }
+
+ case OP_REMOVE_OBJ: {
+ return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE;
+ }
+
+ case OP_IS_LOCAL_LOCKED:
+ return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
+
+ default:
+ throw new IgniteCheckedException("Unsupported operation type: " + type);
+ }
+
+ return TRUE;
+ }
+
+ /**
+ * Loads cache via localLoadCache or loadCache.
+ */
+ private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException {
+ PlatformCacheEntryFilter filter = null;
+
+ Object pred = reader.readObjectDetached();
+
+ if (pred != null)
+ filter = platformCtx.createCacheEntryFilter(pred, reader.readLong());
+
+ Object[] args = reader.readObjectArray();
+
+ if (loc)
+ cache.localLoadCache(filter, args);
+ else
+ cache.loadCache(filter, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Object processInOpObject(int type, PortableRawReaderEx reader)
+ throws IgniteCheckedException {
+ switch (type) {
+ case OP_QRY_SQL:
+ return runQuery(reader, readSqlQuery(reader));
+
+ case OP_QRY_SQL_FIELDS:
+ return runFieldsQuery(reader, readFieldsQuery(reader));
+
+ case OP_QRY_TXT:
+ return runQuery(reader, readTextQuery(reader));
+
+ case OP_QRY_SCAN:
+ return runQuery(reader, readScanQuery(reader));
+
+ case OP_QRY_CONTINUOUS: {
+ long ptr = reader.readLong();
+ boolean loc = reader.readBoolean();
+ boolean hasFilter = reader.readBoolean();
+ Object filter = reader.readObjectDetached();
+ int bufSize = reader.readInt();
+ long timeInterval = reader.readLong();
+ boolean autoUnsubscribe = reader.readBoolean();
+ Query initQry = readInitialQuery(reader);
+
+ PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter);
+
+ qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry);
+
+ return qry;
+ }
+
+ default:
+ return throwUnsupported(type);
+ }
+ }
+
+ /**
+ * Read arguments for SQL query.
+ *
+ * @param reader Reader.
+ * @return Arguments.
+ */
+ @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) {
+ int cnt = reader.readInt();
+
+ if (cnt > 0) {
+ Object[] args = new Object[cnt];
+
+ for (int i = 0; i < cnt; i++)
+ args[i] = reader.readObjectDetached();
+
+ return args;
+ }
+ else
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void processOutOp(int type, PortableRawWriterEx w) throws IgniteCheckedException {
+ switch (type) {
+ case OP_GET_NAME:
+ w.writeObject(cache.getName());
+
+ break;
+
+ case OP_METRICS:
+ CacheMetrics metrics = cache.metrics();
+
+ w.writeLong(metrics.getCacheGets());
+ w.writeLong(metrics.getCachePuts());
+ w.writeLong(metrics.getCacheHits());
+ w.writeLong(metrics.getCacheMisses());
+ w.writeLong(metrics.getCacheTxCommits());
+ w.writeLong(metrics.getCacheTxRollbacks());
+ w.writeLong(metrics.getCacheEvictions());
+ w.writeLong(metrics.getCacheRemovals());
+ w.writeFloat(metrics.getAveragePutTime());
+ w.writeFloat(metrics.getAverageGetTime());
+ w.writeFloat(metrics.getAverageRemoveTime());
+ w.writeFloat(metrics.getAverageTxCommitTime());
+ w.writeFloat(metrics.getAverageTxRollbackTime());
+ w.writeString(metrics.name());
+ w.writeLong(metrics.getOverflowSize());
+ w.writeLong(metrics.getOffHeapEntriesCount());
+ w.writeLong(metrics.getOffHeapAllocatedSize());
+ w.writeInt(metrics.getSize());
+ w.writeInt(metrics.getKeySize());
+ w.writeBoolean(metrics.isEmpty());
+ w.writeInt(metrics.getDhtEvictQueueCurrentSize());
+ w.writeInt(metrics.getTxThreadMapSize());
+ w.writeInt(metrics.getTxXidMapSize());
+ w.writeInt(metrics.getTxCommitQueueSize());
+ w.writeInt(metrics.getTxPrepareQueueSize());
+ w.writeInt(metrics.getTxStartVersionCountsSize());
+ w.writeInt(metrics.getTxCommittedVersionsSize());
+ w.writeInt(metrics.getTxRolledbackVersionsSize());
+ w.writeInt(metrics.getTxDhtThreadMapSize());
+ w.writeInt(metrics.getTxDhtXidMapSize());
+ w.writeInt(metrics.getTxDhtCommitQueueSize());
+ w.writeInt(metrics.getTxDhtPrepareQueueSize());
+ w.writeInt(metrics.getTxDhtStartVersionCountsSize());
+ w.writeInt(metrics.getTxDhtCommittedVersionsSize());
+ w.writeInt(metrics.getTxDhtRolledbackVersionsSize());
+ w.writeBoolean(metrics.isWriteBehindEnabled());
+ w.writeInt(metrics.getWriteBehindFlushSize());
+ w.writeInt(metrics.getWriteBehindFlushThreadCount());
+ w.writeLong(metrics.getWriteBehindFlushFrequency());
+ w.writeInt(metrics.getWriteBehindStoreBatchSize());
+ w.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount());
+ w.writeInt(metrics.getWriteBehindCriticalOverflowCount());
+ w.writeInt(metrics.getWriteBehindErrorRetryCount());
+ w.writeInt(metrics.getWriteBehindBufferSize());
+ w.writeString(metrics.getKeyType());
+ w.writeString(metrics.getValueType());
+ w.writeBoolean(metrics.isStoreByValue());
+ w.writeBoolean(metrics.isStatisticsEnabled());
+ w.writeBoolean(metrics.isManagementEnabled());
+ w.writeBoolean(metrics.isReadThrough());
+ w.writeBoolean(metrics.isWriteThrough());
+ w.writeFloat(metrics.getCacheHitPercentage());
+ w.writeFloat(metrics.getCacheMissPercentage());
+
+ break;
+
+ default:
+ throwUnsupported(type);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
+ @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+ Object arg) throws IgniteCheckedException {
+ switch (type) {
+ case OP_GET: {
+ writer.writeObjectDetached(cache.get(reader.readObjectDetached()));
+
+ break;
+ }
+
+ case OP_GET_AND_PUT: {
+ writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ break;
+ }
+
+ case OP_GET_AND_REPLACE: {
+ writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(),
+ reader.readObjectDetached()));
+
+ break;
+ }
+
+ case OP_GET_AND_REMOVE: {
+ writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached()));
+
+ break;
+ }
+
+ case OP_GET_AND_PUT_IF_ABSENT: {
+ writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()));
+
+ break;
+ }
+
+ case OP_PEEK: {
+ Object key = reader.readObjectDetached();
+
+ CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
+
+ writer.writeObjectDetached(cache.localPeek(key, modes));
+
+ break;
+ }
+
+ case OP_GET_ALL: {
+ Set keys = PlatformUtils.readSet(reader);
+
+ Map entries = cache.getAll(keys);
+
+ PlatformUtils.writeNullableMap(writer, entries);
+
+ break;
+ }
+
+ case OP_INVOKE: {
+ Object key = reader.readObjectDetached();
+
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+ try {
+ writer.writeObjectDetached(cache.invoke(key, proc));
+ }
+ catch (EntryProcessorException ex)
+ {
+ if (ex.getCause() instanceof PlatformNativeException)
+ writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
+ else
+ throw ex;
+ }
+
+ break;
+ }
+
+ case OP_INVOKE_ALL: {
+ Set<Object> keys = PlatformUtils.readSet(reader);
+
+ CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+ writeInvokeAllResult(writer, cache.invokeAll(keys, proc));
+
+ break;
+ }
+
+ case OP_LOCK:
+ writer.writeLong(registerLock(cache.lock(reader.readObjectDetached())));
+
+ break;
+
+ case OP_LOCK_ALL:
+ writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader))));
+
+ break;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Exception convertException(Exception e) {
+ if (e instanceof CachePartialUpdateException)
+ return new PlatformCachePartialUpdateException((CachePartialUpdateException)e, platformCtx, keepPortable);
+
+ return super.convertException(e);
+ }
+
+ /**
+ * Writes the result of InvokeAll cache method.
+ *
+ * @param writer Writer.
+ * @param results Results.
+ */
+ private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) {
+ if (results == null) {
+ writer.writeInt(-1);
+
+ return;
+ }
+
+ writer.writeInt(results.size());
+
+ for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) {
+ writer.writeObjectDetached(entry.getKey());
+
+ EntryProcessorResult procRes = entry.getValue();
+
+ try {
+ Object res = procRes.get();
+
+ writer.writeBoolean(false); // No exception
+
+ writer.writeObjectDetached(res);
+ }
+ catch (Exception ex) {
+ writer.writeBoolean(true); // Exception
+
+ writeError(writer, ex);
+ }
+ }
+ }
+
+ /**
+ * Writes an error to the writer either as a native exception, or as a couple of strings.
+ * @param writer Writer.
+ * @param ex Exception.
+ */
+ private static void writeError(PortableRawWriterEx writer, Exception ex) {
+ if (ex.getCause() instanceof PlatformNativeException)
+ writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
+ else {
+ writer.writeObjectDetached(ex.getClass().getName());
+ writer.writeObjectDetached(ex.getMessage());
+ }
+ }
+
+ /** <inheritDoc /> */
+ @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+ return cache.future();
+ }
+
+ /** <inheritDoc /> */
+ @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+ if (opId == OP_GET_ALL)
+ return WRITER_GET_ALL;
+
+ if (opId == OP_INVOKE)
+ return WRITER_PROC_ERR;
+
+ if (opId == OP_INVOKE_ALL)
+ return WRITER_INVOKE_ALL;
+
+ return null;
+ }
+
+ /**
+ * Clears the contents of the cache, without notifying listeners or
+ * {@link javax.cache.integration.CacheWriter}s.
+ *
+ * @throws IllegalStateException if the cache is closed.
+ * @throws javax.cache.CacheException if there is a problem during the clear
+ */
+ public void clear() throws IgniteCheckedException {
+ cache.clear();
+ }
+
+ /**
+ * Removes all entries.
+ *
+ * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ */
+ public void removeAll() throws IgniteCheckedException {
+ cache.removeAll();
+ }
+
+ /**
+ * Read cache size.
+ *
+ * @param peekModes Encoded peek modes.
+ * @param loc Local mode flag.
+ * @return Size.
+ */
+ public int size(int peekModes, boolean loc) {
+ CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes);
+
+ return loc ? cache.localSize(modes) : cache.size(modes);
+ }
+
+ /**
+ * Create cache iterator.
+ *
+ * @return Cache iterator.
+ */
+ public PlatformCacheIterator iterator() {
+ Iterator<Cache.Entry> iter = cache.iterator();
+
+ return new PlatformCacheIterator(platformCtx, iter);
+ }
+
+ /**
+ * Create cache iterator over local entries.
+ *
+ * @param peekModes Peke modes.
+ * @return Cache iterator.
+ */
+ public PlatformCacheIterator localIterator(int peekModes) {
+ CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes);
+
+ Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator();
+
+ return new PlatformCacheIterator(platformCtx, iter);
+ }
+
+ /**
+ * Enters a lock.
+ *
+ * @param id Lock id.
+ */
+ public void enterLock(long id) throws InterruptedException {
+ lock(id).lockInterruptibly();
+ }
+
+ /**
+ * Exits a lock.
+ *
+ * @param id Lock id.
+ */
+ public void exitLock(long id) {
+ lock(id).unlock();
+ }
+
+ /**
+ * Attempts to enter a lock.
+ *
+ * @param id Lock id.
+ * @param timeout Timeout, in milliseconds. -1 for infinite timeout.
+ */
+ public boolean tryEnterLock(long id, long timeout) throws InterruptedException {
+ return timeout == -1
+ ? lock(id).tryLock()
+ : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Rebalances the cache.
+ *
+ * @param futId Future id.
+ */
+ public void rebalance(long futId) {
+ PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
+ @Override
+ public Object apply(IgniteFuture fut) {
+ return null;
+ }
+ }), futId, PlatformFutureUtils.TYP_OBJ);
+ }
+
+ /**
+ * Unregister lock.
+ *
+ * @param id Lock id.
+ */
+ public void closeLock(long id){
+ Lock lock = lockMap.remove(id);
+
+ assert lock != null : "Failed to unregister lock: " + id;
+ }
+
+ /**
+ * Get lock by id.
+ *
+ * @param id Id.
+ * @return Lock.
+ */
+ private Lock lock(long id) {
+ Lock lock = lockMap.get(id);
+
+ assert lock != null : "Lock not found for ID: " + id;
+
+ return lock;
+ }
+
+ /**
+ * Registers a lock in a map.
+ *
+ * @param lock Lock to register.
+ * @return Registered lock id.
+ */
+ private long registerLock(Lock lock) {
+ long id = LOCK_ID_GEN.incrementAndGet();
+
+ lockMap.put(id, lock);
+
+ return id;
+ }
+
+ /**
+ * Runs specified query.
+ */
+ private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry)
+ throws IgniteCheckedException {
+
+ try {
+ QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
+
+ return new PlatformQueryCursor(platformCtx, cursor,
+ qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE);
+ }
+ catch (Exception err) {
+ throw PlatformUtils.unwrapQueryException(err);
+ }
+ }
+
+ /**
+ * Runs specified fields query.
+ */
+ private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry)
+ throws IgniteCheckedException {
+ try {
+ QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
+
+ return new PlatformFieldsQueryCursor(platformCtx, cursor,
+ qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE);
+ }
+ catch (Exception err) {
+ throw PlatformUtils.unwrapQueryException(err);
+ }
+ }
+
+ /**
+ * Reads the query of specified type.
+ */
+ private Query readInitialQuery(PortableRawReaderEx reader)
+ throws IgniteCheckedException {
+ int typ = reader.readInt();
+
+ switch (typ) {
+ case -1:
+ return null;
+
+ case OP_QRY_SCAN:
+ return readScanQuery(reader);
+
+ case OP_QRY_SQL:
+ return readSqlQuery(reader);
+
+ case OP_QRY_TXT:
+ return readTextQuery(reader);
+ }
+
+ throw new IgniteCheckedException("Unsupported query type: " + typ);
+ }
+
+ /**
+ * Reads sql query.
+ */
+ private Query readSqlQuery(PortableRawReaderEx reader) {
+ boolean loc = reader.readBoolean();
+ String sql = reader.readString();
+ String typ = reader.readString();
+ final int pageSize = reader.readInt();
+
+ Object[] args = readQueryArgs(reader);
+
+ return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
+ }
+
+ /**
+ * Reads fields query.
+ */
+ private Query readFieldsQuery(PortableRawReaderEx reader) {
+ boolean loc = reader.readBoolean();
+ String sql = reader.readString();
+ final int pageSize = reader.readInt();
+
+ Object[] args = readQueryArgs(reader);
+
+ return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
+ }
+
+ /**
+ * Reads text query.
+ */
+ private Query readTextQuery(PortableRawReaderEx reader) {
+ boolean loc = reader.readBoolean();
+ String txt = reader.readString();
+ String typ = reader.readString();
+ final int pageSize = reader.readInt();
+
+ return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc);
+ }
+
+ /**
+ * Reads scan query.
+ */
+ private Query readScanQuery(PortableRawReaderEx reader) {
+ boolean loc = reader.readBoolean();
+ final int pageSize = reader.readInt();
+
+ boolean hasPart = reader.readBoolean();
+
+ Integer part = hasPart ? reader.readInt() : null;
+
+ ScanQuery qry = new ScanQuery().setPageSize(pageSize);
+
+ qry.setPartition(part);
+
+ Object pred = reader.readObjectDetached();
+
+ if (pred != null)
+ qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong()));
+
+ qry.setLocal(loc);
+
+ return qry;
+ }
+
+ /**
+ * Writes error with EntryProcessorException cause.
+ */
+ private static class GetAllWriter implements PlatformFutureUtils.Writer {
+ /** <inheritDoc /> */
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ assert obj instanceof Map;
+
+ PlatformUtils.writeNullableMap(writer, (Map) obj);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return err == null;
+ }
+ }
+
+ /**
+ * Writes error with EntryProcessorException cause.
+ */
+ private static class EntryProcessorExceptionWriter implements PlatformFutureUtils.Writer {
+ /** <inheritDoc /> */
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ EntryProcessorException entryEx = (EntryProcessorException) err;
+
+ writeError(writer, entryEx);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return err instanceof EntryProcessorException;
+ }
+ }
+
+ /**
+ * Writes results of InvokeAll method.
+ */
+ private static class EntryProcessorResultsWriter implements PlatformFutureUtils.Writer {
+ /** <inheritDoc /> */
+ @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+ writeInvokeAllResult(writer, (Map)obj);
+ }
+
+ /** <inheritDoc /> */
+ @Override public boolean canWrite(Object obj, Throwable err) {
+ return obj != null && err == null;
+ }
+ }
+
+ /**
+ * Interop expiry policy.
+ */
+ private static class InteropExpiryPolicy implements ExpiryPolicy {
+ /** Duration: unchanged. */
+ private static final long DUR_UNCHANGED = -2;
+
+ /** Duration: eternal. */
+ private static final long DUR_ETERNAL = -1;
+
+ /** Duration: zero. */
+ private static final long DUR_ZERO = 0;
+
+ /** Expiry for create. */
+ private final Duration create;
+
+ /** Expiry for update. */
+ private final Duration update;
+
+ /** Expiry for access. */
+ private final Duration access;
+
+ /**
+ * Constructor.
+ *
+ * @param create Expiry for create.
+ * @param update Expiry for update.
+ * @param access Expiry for access.
+ */
+ public InteropExpiryPolicy(long create, long update, long access) {
+ this.create = convert(create);
+ this.update = convert(update);
+ this.access = convert(access);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForCreation() {
+ return create;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForUpdate() {
+ return update;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Duration getExpiryForAccess() {
+ return access;
+ }
+
+ /**
+ * Convert encoded duration to actual duration.
+ *
+ * @param dur Encoded duration.
+ * @return Actual duration.
+ */
+ private static Duration convert(long dur) {
+ if (dur == DUR_UNCHANGED)
+ return null;
+ else if (dur == DUR_ETERNAL)
+ return Duration.ETERNAL;
+ else if (dur == DUR_ZERO)
+ return Duration.ZERO;
+ else {
+ assert dur > 0;
+
+ return new Duration(TimeUnit.MILLISECONDS, dur);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
new file mode 100644
index 0000000..fee2995
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.resources.*;
+
+/**
+ * Interop filter. Delegates apply to native platform.
+ */
+public class PlatformCacheEntryFilterImpl<K, V> extends PlatformAbstractPredicate
+ implements PlatformCacheEntryFilter<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ public PlatformCacheEntryFilterImpl() {
+ super();
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param pred .Net portable predicate.
+ * @param ptr Pointer to predicate in the native platform.
+ * @param ctx Kernal context.
+ */
+ public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) {
+ super(pred, ptr, ctx);
+
+ assert pred != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(K k, V v) {
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(k);
+ writer.writeObject(v);
+
+ out.synchronize();
+
+ return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onClose() {
+ if (ptr == 0)
+ return;
+
+ assert ctx != null;
+
+ ctx.gateway().cacheEntryFilterDestroy(ptr);
+
+ ptr = 0;
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ */
+ @IgniteInstanceResource
+ public void setIgniteInstance(Ignite ignite) {
+ ctx = PlatformUtils.platformContext(ignite);
+
+ if (ptr != 0)
+ return;
+
+ try (PlatformMemory mem = ctx.memory().allocate()) {
+ PlatformOutputStream out = mem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writer.writeObject(pred);
+
+ out.synchronize();
+
+ ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
new file mode 100644
index 0000000..ab9ad7c
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+
+/**
+ * Interop cache entry processor. Delegates processing to native platform.
+ */
+public class PlatformCacheEntryProcessor<K, V, T> implements CacheEntryProcessor<K, V, T>, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Indicates that entry has not been modified */
+ private static final byte ENTRY_STATE_INTACT = 0;
+
+ /** Indicates that entry value has been set */
+ private static final byte ENTRY_STATE_VALUE_SET = 1;
+
+ /** Indicates that remove has been called on an entry */
+ private static final byte ENTRY_STATE_REMOVED = 2;
+
+ /** Indicates error in processor that is written as portable. */
+ private static final byte ENTRY_STATE_ERR_PORTABLE = 3;
+
+ /** Indicates error in processor that is written as string. */
+ private static final byte ENTRY_STATE_ERR_STRING = 4;
+
+ /** Native portable processor */
+ private Object proc;
+
+ /** Pointer to processor in the native platform. */
+ private transient long ptr;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ public PlatformCacheEntryProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param proc Native portable processor
+ * @param ptr Pointer to processor in the native platform.
+ */
+ public PlatformCacheEntryProcessor(Object proc, long ptr) {
+ this.proc = proc;
+ this.ptr = ptr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException {
+ try {
+ IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
+
+ PlatformProcessor interopProc;
+
+ try {
+ interopProc = PlatformUtils.platformProcessor(ignite);
+ }
+ catch (IllegalStateException ex){
+ throw new EntryProcessorException(ex);
+ }
+
+ interopProc.awaitStart();
+
+ return execute0(interopProc.context(), entry);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * Executes interop entry processor on a given entry, updates entry and returns result.
+ *
+ * @param ctx Context.
+ * @param entry Entry.
+ * @return Processing result.
+ * @throws org.apache.ignite.IgniteCheckedException
+ */
+ private T execute0(PlatformContext ctx, MutableEntry<K, V> entry)
+ throws IgniteCheckedException {
+ try (PlatformMemory outMem = ctx.memory().allocate()) {
+ PlatformOutputStream out = outMem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writeEntryAndProcessor(entry, writer);
+
+ out.synchronize();
+
+ try (PlatformMemory inMem = ctx.memory().allocate()) {
+ PlatformInputStream in = inMem.input();
+
+ ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
+
+ in.synchronize();
+
+ PortableRawReaderEx reader = ctx.reader(in);
+
+ return readResultAndUpdateEntry(ctx, entry, reader);
+ }
+ }
+ }
+
+ /**
+ * Writes mutable entry and entry processor to the stream.
+ *
+ * @param entry Entry to process.
+ * @param writer Writer.
+ */
+ private void writeEntryAndProcessor(MutableEntry<K, V> entry, PortableRawWriterEx writer) {
+ writer.writeObject(entry.getKey());
+ writer.writeObject(entry.getValue());
+
+ if (ptr != 0) {
+ // Execute locally - we have a pointer to native processor.
+ writer.writeBoolean(true);
+ writer.writeLong(ptr);
+ }
+ else {
+ // We are on a remote node. Send processor holder back to native.
+ writer.writeBoolean(false);
+ writer.writeObject(proc);
+ }
+ }
+
+ /**
+ * Reads processing result from stream, updates mutable entry accordingly, and returns the result.
+ *
+ * @param entry Mutable entry to update.
+ * @param reader Reader.
+ * @return Entry processing result
+ * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
+ */
+ @SuppressWarnings("unchecked")
+ private T readResultAndUpdateEntry(PlatformContext ctx, MutableEntry<K, V> entry, PortableRawReaderEx reader) {
+ byte state = reader.readByte();
+
+ switch (state) {
+ case ENTRY_STATE_VALUE_SET:
+ entry.setValue((V)reader.readObject());
+
+ break;
+
+ case ENTRY_STATE_REMOVED:
+ entry.remove();
+
+ break;
+
+ case ENTRY_STATE_ERR_PORTABLE:
+ // Full exception
+ Object nativeErr = reader.readObjectDetached();
+
+ assert nativeErr != null;
+
+ throw new EntryProcessorException("Failed to execute native cache entry processor.",
+ ctx.createNativeException(nativeErr));
+
+ case ENTRY_STATE_ERR_STRING:
+ // Native exception was not serializable, we have only message.
+ String errMsg = reader.readString();
+
+ assert errMsg != null;
+
+ throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg);
+
+ default:
+ assert state == ENTRY_STATE_INTACT;
+ }
+
+ return (T)reader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(proc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ proc = in.readObject();
+ }
+}