You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/08 14:46:28 UTC
[46/50] ignite git commit: Extract custom processors to a separate
class
Extract custom processors to a separate class
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae7b1b2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae7b1b2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae7b1b2b
Branch: refs/heads/ignite-3199-1
Commit: ae7b1b2bf76284b714a27f8c62eb9750d5648f2a
Parents: a304542
Author: Pavel Tupitsyn <pt...@apache.org>
Authored: Thu Sep 8 14:53:07 2016 +0300
Committer: Pavel Tupitsyn <pt...@apache.org>
Committed: Thu Sep 8 14:53:07 2016 +0300
----------------------------------------------------------------------
.../platform/cache/PlatformCache.java | 68 +++-------------
.../platform/cache/PlatformCacheInvoker.java | 83 ++++++++++++++++++++
2 files changed, 96 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7b1b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 276867f..3a0b51a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.cache;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cache.CachePartialUpdateException;
@@ -49,11 +50,6 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformListenable;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.platform.utils.PlatformWriterClosure;
-import org.apache.ignite.internal.processors.platform.websession.LockEntryProcessor;
-import org.apache.ignite.internal.processors.platform.websession.SessionStateLockInfo;
-import org.apache.ignite.internal.processors.platform.websession.SessionStateData;
-import org.apache.ignite.internal.processors.platform.websession.SetAndUnlockEntryProcessor;
-import org.apache.ignite.internal.processors.platform.websession.UnlockEntryProcessor;
import org.apache.ignite.internal.util.GridConcurrentFactory;
import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.C1;
@@ -63,6 +59,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.jetbrains.annotations.Nullable;
import javax.cache.Cache;
+import javax.cache.CacheException;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CompletionListener;
@@ -201,15 +198,6 @@ public class PlatformCache extends PlatformAbstractTarget {
/** */
public static final int OP_INVOKE_INTERNAL = 41;
- /** */
- public static final int OP_INVOKE_INTERNAL_SESSION_LOCK = 1;
-
- /** */
- public static final int OP_INVOKE_INTERNAL_SESSION_UNLOCK = 2;
-
- /** */
- public static final int OP_INVOKE_INTERNAL_SESSION_SET_AND_UNLOCK = 3;
-
/** Underlying JCache in binary mode. */
private final IgniteCacheProxy cache;
@@ -220,13 +208,13 @@ public class PlatformCache extends PlatformAbstractTarget {
private final boolean keepBinary;
/** */
- private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter();
+ private static final PlatformFutureUtils.Writer WRITER_GET_ALL = new GetAllWriter();
/** */
- private static final EntryProcessorInvokeWriter WRITER_INVOKE = new EntryProcessorInvokeWriter();
+ private static final PlatformFutureUtils.Writer WRITER_INVOKE = new EntryProcessorInvokeWriter();
/** */
- private static final EntryProcessorInvokeAllWriter WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter();
+ private static final PlatformFutureUtils.Writer WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter();
/** Map with currently active locks. */
private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap();
@@ -297,7 +285,7 @@ public class PlatformCache extends PlatformAbstractTarget {
if (cache.isAsync())
return this;
- return new PlatformCache(platformCtx, (IgniteCache)cacheRaw.withAsync(), keepBinary);
+ return new PlatformCache(platformCtx, cacheRaw.withAsync(), keepBinary);
}
/**
@@ -469,38 +457,8 @@ public class PlatformCache extends PlatformAbstractTarget {
});
}
- case OP_INVOKE_INTERNAL: {
- int opCode = reader.readInt();
-
- String key = reader.readString();
-
- switch (opCode) {
- case OP_INVOKE_INTERNAL_SESSION_LOCK: {
- SessionStateLockInfo lockInfo = reader.readObject();
-
- Object res = cacheRaw.invoke(key, new LockEntryProcessor(), lockInfo);
-
- return writeResult(mem, res);
- }
-
- case OP_INVOKE_INTERNAL_SESSION_UNLOCK: {
- SessionStateLockInfo lockInfo = reader.readObject();
-
- cacheRaw.invoke(key, new UnlockEntryProcessor(), lockInfo);
-
- return FALSE;
- }
-
- case OP_INVOKE_INTERNAL_SESSION_SET_AND_UNLOCK:
- SessionStateData data = reader.readObject();
-
- cacheRaw.invoke(key, new SetAndUnlockEntryProcessor(), data);
-
- return FALSE;
- }
-
- return writeResult(mem, null);
- }
+ case OP_INVOKE_INTERNAL:
+ return writeResult(mem, PlatformCacheInvoker.invoke(reader, cacheRaw));
case OP_LOCK:
return registerLock(cache.lock(reader.readObjectDetached()));
@@ -720,7 +678,7 @@ public class PlatformCache extends PlatformAbstractTarget {
return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepBinary);
if (e.getCause() instanceof EntryProcessorException)
- return (EntryProcessorException) e.getCause();
+ return (Exception)e.getCause();
return super.convertException(e);
}
@@ -798,7 +756,7 @@ public class PlatformCache extends PlatformAbstractTarget {
* Clears the contents of the cache, without notifying listeners or CacheWriters.
*
* @throws IllegalStateException if the cache is closed.
- * @throws javax.cache.CacheException if there is a problem during the clear
+ * @throws CacheException if there is a problem during the clear
*/
public void clear() throws IgniteCheckedException {
cache.clear();
@@ -807,7 +765,7 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Removes all entries.
*
- * @throws org.apache.ignite.IgniteCheckedException In case of error.
+ * @throws IgniteCheckedException In case of error.
*/
public void removeAll() throws IgniteCheckedException {
cache.removeAll();
@@ -1024,7 +982,7 @@ public class PlatformCache extends PlatformAbstractTarget {
/**
* Reads text query.
*/
- private Query readTextQuery(BinaryRawReaderEx reader) {
+ private Query readTextQuery(BinaryRawReader reader) {
boolean loc = reader.readBoolean();
String txt = reader.readString();
String typ = reader.readString();
@@ -1143,7 +1101,7 @@ public class PlatformCache extends PlatformAbstractTarget {
* @param update Expiry for update.
* @param access Expiry for access.
*/
- public InteropExpiryPolicy(long create, long update, long access) {
+ private InteropExpiryPolicy(long create, long update, long access) {
this.create = convert(create);
this.update = convert(update);
this.access = convert(access);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae7b1b2b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java
new file mode 100644
index 0000000..64d1a8b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheInvoker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.websession.LockEntryProcessor;
+import org.apache.ignite.internal.processors.platform.websession.SessionStateData;
+import org.apache.ignite.internal.processors.platform.websession.SessionStateLockInfo;
+import org.apache.ignite.internal.processors.platform.websession.SetAndUnlockEntryProcessor;
+import org.apache.ignite.internal.processors.platform.websession.UnlockEntryProcessor;
+
+/**
+ * Custom entry processor invoker.
+ */
+public class PlatformCacheInvoker {
+ /** */
+ public static final int OP_SESSION_LOCK = 1;
+
+ /** */
+ public static final int OP_SESSION_UNLOCK = 2;
+
+ /** */
+ public static final int OP_SESSION_SET_AND_UNLOCK = 3;
+
+ /**
+ * Invokes the custom processor.
+ *
+ * @param reader Reader.
+ * @param cache Cache.
+ *
+ * @return Result.
+ */
+ @SuppressWarnings("unchecked")
+ public static Object invoke(BinaryRawReaderEx reader, IgniteCache cache) {
+ int opCode = reader.readInt();
+
+ String key = reader.readString();
+
+ Object res = null;
+
+ switch (opCode) {
+ case OP_SESSION_LOCK: {
+ SessionStateLockInfo lockInfo = reader.readObject();
+
+ res = cache.invoke(key, new LockEntryProcessor(), lockInfo);
+
+ break;
+ }
+
+ case OP_SESSION_UNLOCK: {
+ SessionStateLockInfo lockInfo = reader.readObject();
+
+ cache.invoke(key, new UnlockEntryProcessor(), lockInfo);
+
+ break;
+ }
+
+ case OP_SESSION_SET_AND_UNLOCK:
+ SessionStateData data = reader.readObject();
+
+ cache.invoke(key, new SetAndUnlockEntryProcessor(), data);
+
+ break;
+ }
+ return res;
+ }
+}