You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/08/28 13:13:44 UTC

ignite git commit: IGNITE-1317: Moved platform cache to Ignite.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1317 [created] 4d650ded7


IGNITE-1317: Moved platform cache to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d650ded
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d650ded
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d650ded

Branch: refs/heads/ignite-1317
Commit: 4d650ded767e901e43abe098f48b01df94298398
Parents: 26f0ee0
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Aug 28 14:14:18 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Aug 28 14:14:18 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    |   22 +-
 .../cache/PlatformCacheEntryFilter.java         |   29 +
 .../platform/cache/PlatformCache.java           | 1056 ++++++++++++++++++
 .../cache/PlatformCacheEntryFilterImpl.java     |  105 ++
 .../cache/PlatformCacheEntryProcessor.java      |  212 ++++
 5 files changed, 1423 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 5275e0d..cbcc91b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.events.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.communication.*;
 import org.apache.ignite.internal.portable.*;
 import org.apache.ignite.internal.processors.cache.query.continuous.*;
+import org.apache.ignite.internal.processors.platform.cache.*;
 import org.apache.ignite.internal.processors.platform.cache.query.*;
 import org.apache.ignite.internal.processors.platform.callback.*;
 import org.apache.ignite.internal.processors.platform.compute.*;
@@ -223,10 +225,28 @@ public interface PlatformContext {
     /**
      * Create closure job.
      *
-     * @param task Task.
+     * @param task Native task.
      * @param ptr Pointer.
      * @param job Native job.
      * @return Closure job.
      */
     public PlatformJob createClosureJob(Object task, long ptr, Object job);
+
+    /**
+     * Create cache entry processor.
+     *
+     * @param proc Native processor.
+     * @param ptr Pointer.
+     * @return Entry processor.
+     */
+    public CacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr);
+
+    /**
+     * Create cache entry filter.
+     *
+     * @param filter Native filter.
+     * @param ptr Pointer.
+     * @return Entry filter.
+     */
+    public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java
new file mode 100644
index 0000000..ac7cba4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+
+/**
+ * Platform cache entry filter interface.
+ */
+public interface PlatformCacheEntryFilter<K, V> extends GridLoadCacheCloseablePredicate<K, V>,
+    CacheQueryCloseableScanBiPredicate<K, V> {
+    // No-op.
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
new file mode 100644
index 0000000..dff9d67
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -0,0 +1,1056 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.query.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.cache.query.*;
+import org.apache.ignite.internal.processors.platform.compute.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.*;
+import javax.cache.expiry.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+import java.util.concurrent.locks.*;
+
+/**
+ * Native cache wrapper implementation.
+ */
+@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
+public class PlatformCache extends PlatformAbstractTarget {
+    /** */
+    public static final int OP_CLEAR = 1;
+
+    /** */
+    public static final int OP_CLEAR_ALL = 2;
+
+    /** */
+    public static final int OP_CONTAINS_KEY = 3;
+
+    /** */
+    public static final int OP_CONTAINS_KEYS = 4;
+
+    /** */
+    public static final int OP_GET = 5;
+
+    /** */
+    public static final int OP_GET_ALL = 6;
+
+    /** */
+    public static final int OP_GET_AND_PUT = 7;
+
+    /** */
+    public static final int OP_GET_AND_PUT_IF_ABSENT = 8;
+
+    /** */
+    public static final int OP_GET_AND_REMOVE = 9;
+
+    /** */
+    public static final int OP_GET_AND_REPLACE = 10;
+
+    /** */
+    public static final int OP_GET_NAME = 11;
+
+    /** */
+    public static final int OP_INVOKE = 12;
+
+    /** */
+    public static final int OP_INVOKE_ALL = 13;
+
+    /** */
+    public static final int OP_IS_LOCAL_LOCKED = 14;
+
+    /** */
+    public static final int OP_LOAD_CACHE = 15;
+
+    /** */
+    public static final int OP_LOC_EVICT = 16;
+
+    /** */
+    public static final int OP_LOC_LOAD_CACHE = 17;
+
+    /** */
+    public static final int OP_LOC_PROMOTE = 18;
+
+    /** */
+    public static final int OP_LOCAL_CLEAR = 20;
+
+    /** */
+    public static final int OP_LOCAL_CLEAR_ALL = 21;
+
+    /** */
+    public static final int OP_LOCK = 22;
+
+    /** */
+    public static final int OP_LOCK_ALL = 23;
+
+    /** */
+    public static final int OP_METRICS = 24;
+
+    /** */
+    private static final int OP_PEEK = 25;
+
+    /** */
+    private static final int OP_PUT = 26;
+
+    /** */
+    private static final int OP_PUT_ALL = 27;
+
+    /** */
+    public static final int OP_PUT_IF_ABSENT = 28;
+
+    /** */
+    public static final int OP_QRY_CONTINUOUS = 29;
+
+    /** */
+    public static final int OP_QRY_SCAN = 30;
+
+    /** */
+    public static final int OP_QRY_SQL = 31;
+
+    /** */
+    public static final int OP_QRY_SQL_FIELDS = 32;
+
+    /** */
+    public static final int OP_QRY_TXT = 33;
+
+    /** */
+    public static final int OP_REMOVE_ALL = 34;
+
+    /** */
+    public static final int OP_REMOVE_BOOL = 35;
+
+    /** */
+    public static final int OP_REMOVE_OBJ = 36;
+
+    /** */
+    public static final int OP_REPLACE_2 = 37;
+
+    /** */
+    public static final int OP_REPLACE_3 = 38;
+
+    /** Underlying JCache. */
+    private final IgniteCacheProxy cache;
+
+    /** Whether this cache is created with "keepPortable" flag on the other side. */
+    private final boolean keepPortable;
+
+    /** */
+    private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter();
+
+    /** */
+    private static final EntryProcessorExceptionWriter WRITER_PROC_ERR = new EntryProcessorExceptionWriter();
+
+    /** */
+    private static final EntryProcessorResultsWriter WRITER_INVOKE_ALL = new EntryProcessorResultsWriter();
+
+    /** Map with currently active locks. */
+    private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap();
+
+    /** Lock ID sequence. */
+    private static final AtomicLong LOCK_ID_GEN = new AtomicLong();
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param cache Underlying cache.
+     * @param keepPortable Keep portable flag.
+     */
+    public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) {
+        super(platformCtx);
+
+        this.cache = (IgniteCacheProxy)cache;
+        this.keepPortable = keepPortable;
+    }
+
+    /**
+     * Gets cache with "skip-store" flag set.
+     *
+     * @return Cache with "skip-store" flag set.
+     */
+    public PlatformCache withSkipStore() {
+        if (cache.delegate().skipStore())
+            return this;
+
+        return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable);
+    }
+
+    /**
+     * Gets cache with "keep portable" flag.
+     *
+     * @return Cache with "keep portable" flag set.
+     */
+    public PlatformCache withKeepPortable() {
+        if (keepPortable)
+            return this;
+
+        return new PlatformCache(platformCtx, cache.withSkipStore(), true);
+    }
+
+    /**
+     * Gets cache with provided expiry policy.
+     *
+     * @param create Create.
+     * @param update Update.
+     * @param access Access.
+     * @return Cache.
+     */
+    public PlatformCache withExpiryPolicy(final long create, final long update, final long access) {
+        IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access));
+
+        return new PlatformCache(platformCtx, cache0, keepPortable);
+    }
+
+    /**
+     * Gets cache with asynchronous mode enabled.
+     *
+     * @return Cache with asynchronous mode enabled.
+     */
+    public PlatformCache withAsync() {
+        if (cache.isAsync())
+            return this;
+
+        return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable);
+    }
+
+    /**
+     * Gets cache with no-retries mode enabled.
+     *
+     * @return Cache with no-retries mode enabled.
+     */
+    public PlatformCache withNoRetries() {
+        CacheOperationContext opCtx = cache.operationContext();
+
+        if (opCtx != null && opCtx.noRetries())
+            return this;
+
+        return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_PUT:
+                cache.put(reader.readObjectDetached(), reader.readObjectDetached());
+
+                return TRUE;
+
+            case OP_REMOVE_BOOL:
+                return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+
+            case OP_REMOVE_ALL:
+                cache.removeAll(PlatformUtils.readSet(reader));
+
+                return TRUE;
+
+            case OP_PUT_ALL:
+                cache.putAll(PlatformUtils.readMap(reader));
+
+                return TRUE;
+
+            case OP_LOC_EVICT:
+                cache.localEvict(PlatformUtils.readCollection(reader));
+
+                return TRUE;
+
+            case OP_CONTAINS_KEY:
+                return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE;
+
+            case OP_CONTAINS_KEYS:
+                return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE;
+
+            case OP_LOC_PROMOTE: {
+                cache.localPromote(PlatformUtils.readSet(reader));
+
+                break;
+            }
+
+            case OP_REPLACE_3:
+                return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(),
+                    reader.readObjectDetached()) ? TRUE : FALSE;
+
+            case OP_LOC_LOAD_CACHE:
+                loadCache0(reader, true);
+
+                break;
+
+            case OP_LOAD_CACHE:
+                loadCache0(reader, false);
+
+                break;
+
+            case OP_CLEAR:
+                cache.clear(reader.readObjectDetached());
+
+                break;
+
+            case OP_CLEAR_ALL:
+                cache.clearAll(PlatformUtils.readSet(reader));
+
+                break;
+
+            case OP_LOCAL_CLEAR:
+                cache.localClear(reader.readObjectDetached());
+
+                break;
+
+            case OP_LOCAL_CLEAR_ALL:
+                cache.localClearAll(PlatformUtils.readSet(reader));
+
+                break;
+
+            case OP_PUT_IF_ABSENT: {
+                return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+            }
+
+            case OP_REPLACE_2: {
+                return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE;
+            }
+
+            case OP_REMOVE_OBJ: {
+                return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE;
+            }
+
+            case OP_IS_LOCAL_LOCKED:
+                return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE;
+
+            default:
+                throw new IgniteCheckedException("Unsupported operation type: " + type);
+        }
+
+        return TRUE;
+    }
+
+    /**
+     * Loads cache via localLoadCache or loadCache.
+     */
+    private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException {
+        PlatformCacheEntryFilter filter = null;
+
+        Object pred = reader.readObjectDetached();
+
+        if (pred != null)
+            filter = platformCtx.createCacheEntryFilter(pred, reader.readLong());
+
+        Object[] args = reader.readObjectArray();
+
+        if (loc)
+            cache.localLoadCache(filter, args);
+        else
+            cache.loadCache(filter, args);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object processInOpObject(int type, PortableRawReaderEx reader)
+        throws IgniteCheckedException {
+        switch (type) {
+            case OP_QRY_SQL:
+                return runQuery(reader, readSqlQuery(reader));
+
+            case OP_QRY_SQL_FIELDS:
+                return runFieldsQuery(reader, readFieldsQuery(reader));
+
+            case OP_QRY_TXT:
+                return runQuery(reader, readTextQuery(reader));
+
+            case OP_QRY_SCAN:
+                return runQuery(reader, readScanQuery(reader));
+
+            case OP_QRY_CONTINUOUS: {
+                long ptr = reader.readLong();
+                boolean loc = reader.readBoolean();
+                boolean hasFilter = reader.readBoolean();
+                Object filter = reader.readObjectDetached();
+                int bufSize = reader.readInt();
+                long timeInterval = reader.readLong();
+                boolean autoUnsubscribe = reader.readBoolean();
+                Query initQry = readInitialQuery(reader);
+
+                PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter);
+
+                qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry);
+
+                return qry;
+            }
+
+            default:
+                return throwUnsupported(type);
+        }
+    }
+
+    /**
+     * Read arguments for SQL query.
+     *
+     * @param reader Reader.
+     * @return Arguments.
+     */
+    @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) {
+        int cnt = reader.readInt();
+
+        if (cnt > 0) {
+            Object[] args = new Object[cnt];
+
+            for (int i = 0; i < cnt; i++)
+                args[i] = reader.readObjectDetached();
+
+            return args;
+        }
+        else
+            return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void processOutOp(int type, PortableRawWriterEx w) throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET_NAME:
+                w.writeObject(cache.getName());
+
+                break;
+
+            case OP_METRICS:
+                CacheMetrics metrics = cache.metrics();
+
+                w.writeLong(metrics.getCacheGets());
+                w.writeLong(metrics.getCachePuts());
+                w.writeLong(metrics.getCacheHits());
+                w.writeLong(metrics.getCacheMisses());
+                w.writeLong(metrics.getCacheTxCommits());
+                w.writeLong(metrics.getCacheTxRollbacks());
+                w.writeLong(metrics.getCacheEvictions());
+                w.writeLong(metrics.getCacheRemovals());
+                w.writeFloat(metrics.getAveragePutTime());
+                w.writeFloat(metrics.getAverageGetTime());
+                w.writeFloat(metrics.getAverageRemoveTime());
+                w.writeFloat(metrics.getAverageTxCommitTime());
+                w.writeFloat(metrics.getAverageTxRollbackTime());
+                w.writeString(metrics.name());
+                w.writeLong(metrics.getOverflowSize());
+                w.writeLong(metrics.getOffHeapEntriesCount());
+                w.writeLong(metrics.getOffHeapAllocatedSize());
+                w.writeInt(metrics.getSize());
+                w.writeInt(metrics.getKeySize());
+                w.writeBoolean(metrics.isEmpty());
+                w.writeInt(metrics.getDhtEvictQueueCurrentSize());
+                w.writeInt(metrics.getTxThreadMapSize());
+                w.writeInt(metrics.getTxXidMapSize());
+                w.writeInt(metrics.getTxCommitQueueSize());
+                w.writeInt(metrics.getTxPrepareQueueSize());
+                w.writeInt(metrics.getTxStartVersionCountsSize());
+                w.writeInt(metrics.getTxCommittedVersionsSize());
+                w.writeInt(metrics.getTxRolledbackVersionsSize());
+                w.writeInt(metrics.getTxDhtThreadMapSize());
+                w.writeInt(metrics.getTxDhtXidMapSize());
+                w.writeInt(metrics.getTxDhtCommitQueueSize());
+                w.writeInt(metrics.getTxDhtPrepareQueueSize());
+                w.writeInt(metrics.getTxDhtStartVersionCountsSize());
+                w.writeInt(metrics.getTxDhtCommittedVersionsSize());
+                w.writeInt(metrics.getTxDhtRolledbackVersionsSize());
+                w.writeBoolean(metrics.isWriteBehindEnabled());
+                w.writeInt(metrics.getWriteBehindFlushSize());
+                w.writeInt(metrics.getWriteBehindFlushThreadCount());
+                w.writeLong(metrics.getWriteBehindFlushFrequency());
+                w.writeInt(metrics.getWriteBehindStoreBatchSize());
+                w.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount());
+                w.writeInt(metrics.getWriteBehindCriticalOverflowCount());
+                w.writeInt(metrics.getWriteBehindErrorRetryCount());
+                w.writeInt(metrics.getWriteBehindBufferSize());
+                w.writeString(metrics.getKeyType());
+                w.writeString(metrics.getValueType());
+                w.writeBoolean(metrics.isStoreByValue());
+                w.writeBoolean(metrics.isStatisticsEnabled());
+                w.writeBoolean(metrics.isManagementEnabled());
+                w.writeBoolean(metrics.isReadThrough());
+                w.writeBoolean(metrics.isWriteThrough());
+                w.writeFloat(metrics.getCacheHitPercentage());
+                w.writeFloat(metrics.getCacheMissPercentage());
+
+                break;
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
+    @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+        Object arg) throws IgniteCheckedException {
+        switch (type) {
+            case OP_GET: {
+                writer.writeObjectDetached(cache.get(reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_PUT: {
+                writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_REPLACE: {
+                writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(),
+                    reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_REMOVE: {
+                writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_GET_AND_PUT_IF_ABSENT: {
+                writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()));
+
+                break;
+            }
+
+            case OP_PEEK: {
+                Object key = reader.readObjectDetached();
+
+                CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt());
+
+                writer.writeObjectDetached(cache.localPeek(key, modes));
+
+                break;
+            }
+
+            case OP_GET_ALL: {
+                Set keys = PlatformUtils.readSet(reader);
+
+                Map entries = cache.getAll(keys);
+
+                PlatformUtils.writeNullableMap(writer, entries);
+
+                break;
+            }
+
+            case OP_INVOKE: {
+                Object key = reader.readObjectDetached();
+
+                CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+                try {
+                    writer.writeObjectDetached(cache.invoke(key, proc));
+                }
+                catch (EntryProcessorException ex)
+                {
+                    if (ex.getCause() instanceof PlatformNativeException)
+                        writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
+                    else
+                        throw ex;
+                }
+
+                break;
+            }
+
+            case OP_INVOKE_ALL: {
+                Set<Object> keys = PlatformUtils.readSet(reader);
+
+                CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0);
+
+                writeInvokeAllResult(writer, cache.invokeAll(keys, proc));
+
+                break;
+            }
+
+            case OP_LOCK:
+                writer.writeLong(registerLock(cache.lock(reader.readObjectDetached())));
+
+                break;
+
+            case OP_LOCK_ALL:
+                writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader))));
+
+                break;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Exception convertException(Exception e) {
+        if (e instanceof CachePartialUpdateException)
+            return new PlatformCachePartialUpdateException((CachePartialUpdateException)e, platformCtx, keepPortable);
+
+        return super.convertException(e);
+    }
+
+    /**
+     * Writes the result of InvokeAll cache method.
+     *
+     * @param writer Writer.
+     * @param results Results.
+     */
+    private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) {
+        if (results == null) {
+            writer.writeInt(-1);
+
+            return;
+        }
+
+        writer.writeInt(results.size());
+
+        for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) {
+            writer.writeObjectDetached(entry.getKey());
+
+            EntryProcessorResult procRes = entry.getValue();
+
+            try {
+                Object res = procRes.get();
+
+                writer.writeBoolean(false);  // No exception
+
+                writer.writeObjectDetached(res);
+            }
+            catch (Exception ex) {
+                writer.writeBoolean(true);  // Exception
+
+                writeError(writer, ex);
+            }
+        }
+    }
+
+    /**
+     * Writes an error to the writer either as a native exception, or as a couple of strings.
+     * @param writer Writer.
+     * @param ex Exception.
+     */
+    private static void writeError(PortableRawWriterEx writer, Exception ex) {
+        if (ex.getCause() instanceof PlatformNativeException)
+            writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause());
+        else {
+            writer.writeObjectDetached(ex.getClass().getName());
+            writer.writeObjectDetached(ex.getMessage());
+        }
+    }
+
+    /** <inheritDoc /> */
+    @Override protected IgniteFuture currentFuture() throws IgniteCheckedException {
+        return cache.future();
+    }
+
+    /** <inheritDoc /> */
+    @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) {
+        if (opId == OP_GET_ALL)
+            return WRITER_GET_ALL;
+
+        if (opId == OP_INVOKE)
+            return WRITER_PROC_ERR;
+
+        if (opId == OP_INVOKE_ALL)
+            return WRITER_INVOKE_ALL;
+
+        return null;
+    }
+
+    /**
+     * Clears the contents of the cache, without notifying listeners or
+     * {@link javax.cache.integration.CacheWriter}s.
+     *
+     * @throws IllegalStateException if the cache is closed.
+     * @throws javax.cache.CacheException        if there is a problem during the clear
+     */
+    public void clear() throws IgniteCheckedException {
+        cache.clear();
+    }
+
+    /**
+     * Removes all entries.
+     *
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public void removeAll() throws IgniteCheckedException {
+        cache.removeAll();
+    }
+
+    /**
+     * Read cache size.
+     *
+     * @param peekModes Encoded peek modes.
+     * @param loc Local mode flag.
+     * @return Size.
+     */
+    public int size(int peekModes, boolean loc) {
+        CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes);
+
+        return loc ? cache.localSize(modes) :  cache.size(modes);
+    }
+
+    /**
+     * Create cache iterator.
+     *
+     * @return Cache iterator.
+     */
+    public PlatformCacheIterator iterator() {
+        Iterator<Cache.Entry> iter = cache.iterator();
+
+        return new PlatformCacheIterator(platformCtx, iter);
+    }
+
+    /**
+     * Create cache iterator over local entries.
+     *
+     * @param peekModes Peke modes.
+     * @return Cache iterator.
+     */
+    public PlatformCacheIterator localIterator(int peekModes) {
+        CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes);
+
+        Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator();
+
+        return new PlatformCacheIterator(platformCtx, iter);
+    }
+
+    /**
+     * Enters a lock.
+     *
+     * @param id Lock id.
+     */
+    public void enterLock(long id) throws InterruptedException {
+        lock(id).lockInterruptibly();
+    }
+
+    /**
+     * Exits a lock.
+     *
+     * @param id Lock id.
+     */
+    public void exitLock(long id) {
+        lock(id).unlock();
+    }
+
+    /**
+     * Attempts to enter a lock.
+     *
+     * @param id Lock id.
+     * @param timeout Timeout, in milliseconds. -1 for infinite timeout.
+     */
+    public boolean tryEnterLock(long id, long timeout) throws InterruptedException {
+        return timeout == -1
+            ? lock(id).tryLock()
+            : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Rebalances the cache.
+     *
+     * @param futId Future id.
+     */
+    public void rebalance(long futId) {
+        PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() {
+            @Override
+            public Object apply(IgniteFuture fut) {
+                return null;
+            }
+        }), futId, PlatformFutureUtils.TYP_OBJ);
+    }
+
+    /**
+     * Unregister lock.
+     *
+     * @param id Lock id.
+     */
+    public void closeLock(long id){
+        Lock lock = lockMap.remove(id);
+
+        assert lock != null : "Failed to unregister lock: " + id;
+    }
+
+    /**
+     * Get lock by id.
+     *
+     * @param id Id.
+     * @return Lock.
+     */
+    private Lock lock(long id) {
+        Lock lock = lockMap.get(id);
+
+        assert lock != null : "Lock not found for ID: " + id;
+
+        return lock;
+    }
+
+    /**
+     * Registers a lock in a map.
+     *
+     * @param lock Lock to register.
+     * @return Registered lock id.
+     */
+    private long registerLock(Lock lock) {
+        long id = LOCK_ID_GEN.incrementAndGet();
+
+        lockMap.put(id, lock);
+
+        return id;
+    }
+
+    /**
+     * Runs specified query.
+     */
+    private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry)
+        throws IgniteCheckedException {
+
+        try {
+            QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
+
+            return new PlatformQueryCursor(platformCtx, cursor,
+                qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE);
+        }
+        catch (Exception err) {
+            throw PlatformUtils.unwrapQueryException(err);
+        }
+    }
+
+    /**
+     * Runs specified fields query.
+     */
+    private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry)
+        throws IgniteCheckedException {
+        try {
+            QueryCursorEx cursor = (QueryCursorEx) cache.query(qry);
+
+            return new PlatformFieldsQueryCursor(platformCtx, cursor,
+                qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE);
+        }
+        catch (Exception err) {
+            throw PlatformUtils.unwrapQueryException(err);
+        }
+    }
+
+    /**
+     * Reads the query of specified type.
+     */
+    private Query readInitialQuery(PortableRawReaderEx reader)
+        throws IgniteCheckedException {
+        int typ = reader.readInt();
+
+        switch (typ) {
+            case -1:
+                return null;
+
+            case OP_QRY_SCAN:
+                return readScanQuery(reader);
+
+            case OP_QRY_SQL:
+                return readSqlQuery(reader);
+
+            case OP_QRY_TXT:
+                return readTextQuery(reader);
+        }
+
+        throw new IgniteCheckedException("Unsupported query type: " + typ);
+    }
+
+    /**
+     * Reads sql query.
+     */
+    private Query readSqlQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        String sql = reader.readString();
+        String typ = reader.readString();
+        final int pageSize = reader.readInt();
+
+        Object[] args = readQueryArgs(reader);
+
+        return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
+    }
+
+    /**
+     * Reads fields query.
+     */
+    private Query readFieldsQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        String sql = reader.readString();
+        final int pageSize = reader.readInt();
+
+        Object[] args = readQueryArgs(reader);
+
+        return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc);
+    }
+
+    /**
+     * Reads text query.
+     */
+    private Query readTextQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        String txt = reader.readString();
+        String typ = reader.readString();
+        final int pageSize = reader.readInt();
+
+        return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc);
+    }
+
+    /**
+     * Reads scan query.
+     */
+    private Query readScanQuery(PortableRawReaderEx reader) {
+        boolean loc = reader.readBoolean();
+        final int pageSize = reader.readInt();
+
+        boolean hasPart = reader.readBoolean();
+
+        Integer part = hasPart ? reader.readInt() : null;
+
+        ScanQuery qry = new ScanQuery().setPageSize(pageSize);
+
+        qry.setPartition(part);
+
+        Object pred = reader.readObjectDetached();
+
+        if (pred != null)
+            qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong()));
+
+        qry.setLocal(loc);
+
+        return qry;
+    }
+
+    /**
+     * Writes error with EntryProcessorException cause.
+     */
+    private static class GetAllWriter implements PlatformFutureUtils.Writer {
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            assert obj instanceof Map;
+
+            PlatformUtils.writeNullableMap(writer, (Map) obj);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return err == null;
+        }
+    }
+
+    /**
+     * Writes error with EntryProcessorException cause.
+     */
+    private static class EntryProcessorExceptionWriter implements PlatformFutureUtils.Writer {
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            EntryProcessorException entryEx = (EntryProcessorException) err;
+
+            writeError(writer, entryEx);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return err instanceof EntryProcessorException;
+        }
+    }
+
+    /**
+     * Writes results of InvokeAll method.
+     */
+    private static class EntryProcessorResultsWriter implements PlatformFutureUtils.Writer {
+        /** <inheritDoc /> */
+        @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) {
+            writeInvokeAllResult(writer, (Map)obj);
+        }
+
+        /** <inheritDoc /> */
+        @Override public boolean canWrite(Object obj, Throwable err) {
+            return obj != null && err == null;
+        }
+    }
+
+    /**
+     * Interop expiry policy.
+     */
+    private static class InteropExpiryPolicy implements ExpiryPolicy {
+        /** Duration: unchanged. */
+        private static final long DUR_UNCHANGED = -2;
+
+        /** Duration: eternal. */
+        private static final long DUR_ETERNAL = -1;
+
+        /** Duration: zero. */
+        private static final long DUR_ZERO = 0;
+
+        /** Expiry for create. */
+        private final Duration create;
+
+        /** Expiry for update. */
+        private final Duration update;
+
+        /** Expiry for access. */
+        private final Duration access;
+
+        /**
+         * Constructor.
+         *
+         * @param create Expiry for create.
+         * @param update Expiry for update.
+         * @param access Expiry for access.
+         */
+        public InteropExpiryPolicy(long create, long update, long access) {
+            this.create = convert(create);
+            this.update = convert(update);
+            this.access = convert(access);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForCreation() {
+            return create;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForUpdate() {
+            return update;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Duration getExpiryForAccess() {
+            return access;
+        }
+
+        /**
+         * Convert encoded duration to actual duration.
+         *
+         * @param dur Encoded duration.
+         * @return Actual duration.
+         */
+        private static Duration convert(long dur) {
+            if (dur == DUR_UNCHANGED)
+                return null;
+            else if (dur == DUR_ETERNAL)
+                return Duration.ETERNAL;
+            else if (dur == DUR_ZERO)
+                return Duration.ZERO;
+            else {
+                assert dur > 0;
+
+                return new Duration(TimeUnit.MILLISECONDS, dur);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
new file mode 100644
index 0000000..fee2995
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.resources.*;
+
+/**
+ * Interop filter. Delegates apply to native platform.
+ */
+public class PlatformCacheEntryFilterImpl<K, V> extends PlatformAbstractPredicate
+    implements PlatformCacheEntryFilter<K, V> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformCacheEntryFilterImpl() {
+        super();
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ptr Pointer to predicate in the native platform.
+     * @param ctx Kernal context.
+     */
+    public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) {
+        super(pred, ptr, ctx);
+
+        assert pred != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(K k, V v) {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(k);
+            writer.writeObject(v);
+
+            out.synchronize();
+
+            return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onClose() {
+        if (ptr == 0)
+            return;
+
+        assert ctx != null;
+
+        ctx.gateway().cacheEntryFilterDestroy(ptr);
+
+        ptr = 0;
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @IgniteInstanceResource
+    public void setIgniteInstance(Ignite ignite) {
+        ctx = PlatformUtils.platformContext(ignite);
+
+        if (ptr != 0)
+            return;
+
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+
+            out.synchronize();
+
+            ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d650ded/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
new file mode 100644
index 0000000..ab9ad7c
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+
+/**
+ * Interop cache entry processor. Delegates processing to native platform.
+ */
+public class PlatformCacheEntryProcessor<K, V, T> implements CacheEntryProcessor<K, V, T>, Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Indicates that entry has not been modified  */
+    private static final byte ENTRY_STATE_INTACT = 0;
+
+    /** Indicates that entry value has been set  */
+    private static final byte ENTRY_STATE_VALUE_SET = 1;
+
+    /** Indicates that remove has been called on an entry  */
+    private static final byte ENTRY_STATE_REMOVED = 2;
+
+    /** Indicates error in processor that is written as portable.  */
+    private static final byte ENTRY_STATE_ERR_PORTABLE = 3;
+
+    /** Indicates error in processor that is written as string.  */
+    private static final byte ENTRY_STATE_ERR_STRING = 4;
+
+    /** Native portable processor */
+    private Object proc;
+
+    /** Pointer to processor in the native platform. */
+    private transient long ptr;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformCacheEntryProcessor() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param proc Native portable processor
+     * @param ptr Pointer to processor in the native platform.
+     */
+    public PlatformCacheEntryProcessor(Object proc, long ptr) {
+        this.proc = proc;
+        this.ptr = ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException {
+        try {
+            IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
+
+            PlatformProcessor interopProc;
+
+            try {
+                interopProc = PlatformUtils.platformProcessor(ignite);
+            }
+            catch (IllegalStateException ex){
+                throw new EntryProcessorException(ex);
+            }
+
+            interopProc.awaitStart();
+
+            return execute0(interopProc.context(), entry);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /**
+     * Executes interop entry processor on a given entry, updates entry and returns result.
+     *
+     * @param ctx Context.
+     * @param entry Entry.
+     * @return Processing result.
+     * @throws org.apache.ignite.IgniteCheckedException
+     */
+    private T execute0(PlatformContext ctx, MutableEntry<K, V> entry)
+        throws IgniteCheckedException {
+        try (PlatformMemory outMem = ctx.memory().allocate()) {
+            PlatformOutputStream out = outMem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writeEntryAndProcessor(entry, writer);
+
+            out.synchronize();
+
+            try (PlatformMemory inMem = ctx.memory().allocate()) {
+                PlatformInputStream in = inMem.input();
+
+                ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
+
+                in.synchronize();
+
+                PortableRawReaderEx reader = ctx.reader(in);
+
+                return readResultAndUpdateEntry(ctx, entry, reader);
+            }
+        }
+    }
+
+    /**
+     * Writes mutable entry and entry processor to the stream.
+     *
+     * @param entry Entry to process.
+     * @param writer Writer.
+     */
+    private void writeEntryAndProcessor(MutableEntry<K, V> entry, PortableRawWriterEx writer) {
+        writer.writeObject(entry.getKey());
+        writer.writeObject(entry.getValue());
+
+        if (ptr != 0) {
+            // Execute locally - we have a pointer to native processor.
+            writer.writeBoolean(true);
+            writer.writeLong(ptr);
+        }
+        else {
+            // We are on a remote node. Send processor holder back to native.
+            writer.writeBoolean(false);
+            writer.writeObject(proc);
+        }
+    }
+
+    /**
+     * Reads processing result from stream, updates mutable entry accordingly, and returns the result.
+     *
+     * @param entry Mutable entry to update.
+     * @param reader Reader.
+     * @return Entry processing result
+     * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
+     */
+    @SuppressWarnings("unchecked")
+    private T readResultAndUpdateEntry(PlatformContext ctx, MutableEntry<K, V> entry, PortableRawReaderEx reader) {
+        byte state = reader.readByte();
+
+        switch (state) {
+            case ENTRY_STATE_VALUE_SET:
+                entry.setValue((V)reader.readObject());
+
+                break;
+
+            case ENTRY_STATE_REMOVED:
+                entry.remove();
+
+                break;
+
+            case ENTRY_STATE_ERR_PORTABLE:
+                // Full exception
+                Object nativeErr = reader.readObjectDetached();
+
+                assert nativeErr != null;
+
+                throw new EntryProcessorException("Failed to execute native cache entry processor.",
+                    ctx.createNativeException(nativeErr));
+
+            case ENTRY_STATE_ERR_STRING:
+                // Native exception was not serializable, we have only message.
+                String errMsg = reader.readString();
+
+                assert errMsg != null;
+
+                throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg);
+
+            default:
+                assert state == ENTRY_STATE_INTACT;
+        }
+
+        return (T)reader.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(proc);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        proc = in.readObject();
+    }
+}