You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/08 14:46:28 UTC

[46/50] ignite git commit: Extract custom processors to a separate class

Extract custom processors to a separate class


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae7b1b2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae7b1b2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae7b1b2b

Branch: refs/heads/ignite-3199-1
Commit: ae7b1b2bf76284b714a27f8c62eb9750d5648f2a
Parents: a304542
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Sep 8 14:53:07 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Sep 8 14:53:07 2016 +0300

----------------------------------------------------------------------
 .../platform/cache/PlatformCache.java           | 68 +++-------------
 .../platform/cache/PlatformCacheInvoker.java    | 83 ++++++++++++++++++++
 2 files changed, 96 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7b1b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 276867f..3a0b51a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.cache;
 
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.cache.CacheEntryProcessor;
 import org.apache.ignite.cache.CacheMetrics;
 import org.apache.ignite.cache.CachePartialUpdateException;
@@ -49,11 +50,6 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
 import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
 import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
-import org.apache.ignite.internal.processors.platform.websession.LockEntryProcessor;
-import org.apache.ignite.internal.processors.platform.websession.SessionStateLockInfo;
-import org.apache.ignite.internal.processors.platform.websession.SessionStateData;
-import org.apache.ignite.internal.processors.platform.websession.SetAndUnlockEntryProcessor;
-import org.apache.ignite.internal.processors.platform.websession.UnlockEntryProcessor;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.C1;
@@ -63,6 +59,7 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.jetbrains.annotations.Nullable;
 
 import javax.cache.Cache;
+import javax.cache.CacheException;
 import javax.cache.expiry.Duration;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.integration.CompletionListener;
@@ -201,15 +198,6 @@ public class PlatformCache extends PlatformAbstractTarget {
     /** */
     public static final int OP_INVOKE_INTERNAL = 41;
 
-    /** */
-    public static final int OP_INVOKE_INTERNAL_SESSION_LOCK = 1;
-
-    /** */
-    public static final int OP_INVOKE_INTERNAL_SESSION_UNLOCK = 2;
-
-    /** */
-    public static final int OP_INVOKE_INTERNAL_SESSION_SET_AND_UNLOCK = 3;
-
     /** Underlying JCache in binary mode. */
     private final IgniteCacheProxy cache;
 
@@ -220,13 +208,13 @@ public class PlatformCache extends PlatformAbstractTarget {
     private final boolean keepBinary;
 
     /** */
-    private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter();
+    private static final PlatformFutureUtils.Writer WRITER_GET_ALL = new GetAllWriter();
 
     /** */
-    private static final EntryProcessorInvokeWriter WRITER_INVOKE = new EntryProcessorInvokeWriter();
+    private static final PlatformFutureUtils.Writer WRITER_INVOKE = new EntryProcessorInvokeWriter();
 
     /** */
-    private static final EntryProcessorInvokeAllWriter WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter();
+    private static final PlatformFutureUtils.Writer WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter();
 
     /** Map with currently active locks. */
     private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap();
@@ -297,7 +285,7 @@ public class PlatformCache extends PlatformAbstractTarget {
         if (cache.isAsync())
             return this;
 
-        return new PlatformCache(platformCtx, (IgniteCache)cacheRaw.withAsync(), keepBinary);
+        return new PlatformCache(platformCtx, cacheRaw.withAsync(), keepBinary);
     }
 
     /**
@@ -469,38 +457,8 @@ public class PlatformCache extends PlatformAbstractTarget {
                     });
                 }
 
-                case OP_INVOKE_INTERNAL: {
-                    int opCode = reader.readInt();
-
-                    String key = reader.readString();
-
-                    switch (opCode) {
-                        case OP_INVOKE_INTERNAL_SESSION_LOCK: {
-                            SessionStateLockInfo lockInfo = reader.readObject();
-
-                            Object res = cacheRaw.invoke(key, new LockEntryProcessor(), lockInfo);
-
-                            return writeResult(mem, res);
-                        }
-
-                        case OP_INVOKE_INTERNAL_SESSION_UNLOCK: {
-                            SessionStateLockInfo lockInfo = reader.readObject();
-
-                            cacheRaw.invoke(key, new UnlockEntryProcessor(), lockInfo);
-
-                            return FALSE;
-                        }
-
-                        case OP_INVOKE_INTERNAL_SESSION_SET_AND_UNLOCK:
-                            SessionStateData data = reader.readObject();
-
-                            cacheRaw.invoke(key, new SetAndUnlockEntryProcessor(), data);
-
-                            return FALSE;
-                    }
-
-                    return writeResult(mem, null);
-                }
+                case OP_INVOKE_INTERNAL:
+                    return writeResult(mem, PlatformCacheInvoker.invoke(reader, cacheRaw));
 
                 case OP_LOCK:
                     return registerLock(cache.lock(reader.readObjectDetached()));
@@ -720,7 +678,7 @@ public class PlatformCache extends PlatformAbstractTarget {
             return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepBinary);
 
         if (e.getCause() instanceof EntryProcessorException)
-            return (EntryProcessorException) e.getCause();
+            return (Exception)e.getCause();
 
         return super.convertException(e);
     }
@@ -798,7 +756,7 @@ public class PlatformCache extends PlatformAbstractTarget {
      * Clears the contents of the cache, without notifying listeners or CacheWriters.
      *
      * @throws IllegalStateException if the cache is closed.
-     * @throws javax.cache.CacheException if there is a problem during the clear
+     * @throws CacheException if there is a problem during the clear
      */
     public void clear() throws IgniteCheckedException {
         cache.clear();
@@ -807,7 +765,7 @@ public class PlatformCache extends PlatformAbstractTarget {
     /**
      * Removes all entries.
      *
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     * @throws IgniteCheckedException In case of error.
      */
     public void removeAll() throws IgniteCheckedException {
         cache.removeAll();
@@ -1024,7 +982,7 @@ public class PlatformCache extends PlatformAbstractTarget {
     /**
      * Reads text query.
      */
-    private Query readTextQuery(BinaryRawReaderEx reader) {
+    private Query readTextQuery(BinaryRawReader reader) {
         boolean loc = reader.readBoolean();
         String txt = reader.readString();
         String typ = reader.readString();
@@ -1143,7 +1101,7 @@ public class PlatformCache extends PlatformAbstractTarget {
          * @param update Expiry for update.
          * @param access Expiry for access.
          */
-        public InteropExpiryPolicy(long create, long update, long access) {
+        private InteropExpiryPolicy(long create, long update, long access) {
             this.create = convert(create);
             this.update = convert(update);
             this.access = convert(access);

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7b1b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java
new file mode 100644
index 0000000..64d1a8b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.websession.LockEntryProcessor;
+import org.apache.ignite.internal.processors.platform.websession.SessionStateData;
+import org.apache.ignite.internal.processors.platform.websession.SessionStateLockInfo;
+import org.apache.ignite.internal.processors.platform.websession.SetAndUnlockEntryProcessor;
+import org.apache.ignite.internal.processors.platform.websession.UnlockEntryProcessor;
+
+/**
+ * Custom entry processor invoker.
+ */
+public class PlatformCacheInvoker {
+    /** */
+    public static final int OP_SESSION_LOCK = 1;
+
+    /** */
+    public static final int OP_SESSION_UNLOCK = 2;
+
+    /** */
+    public static final int OP_SESSION_SET_AND_UNLOCK = 3;
+
+    /**
+     * Invokes the custom processor.
+     *
+     * @param reader Reader.
+     * @param cache Cache.
+     *
+     * @return Result.
+     */
+    @SuppressWarnings("unchecked")
+    public static Object invoke(BinaryRawReaderEx reader, IgniteCache cache) {
+        int opCode = reader.readInt();
+
+        String key = reader.readString();
+
+        Object res = null;
+
+        switch (opCode) {
+            case OP_SESSION_LOCK: {
+                SessionStateLockInfo lockInfo = reader.readObject();
+
+                res = cache.invoke(key, new LockEntryProcessor(), lockInfo);
+
+                break;
+            }
+
+            case OP_SESSION_UNLOCK: {
+                SessionStateLockInfo lockInfo = reader.readObject();
+
+                cache.invoke(key, new UnlockEntryProcessor(), lockInfo);
+
+                break;
+            }
+
+            case OP_SESSION_SET_AND_UNLOCK:
+                SessionStateData data = reader.readObject();
+
+                cache.invoke(key, new SetAndUnlockEntryProcessor(), data);
+
+                break;
+        }
+        return res;
+    }
+}