You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/01 14:08:25 UTC
[76/82] [abbrv] ignite git commit: Platforms: reworked
PlatformCacheEntryProcessor interface.
Platforms: reworked PlatformCacheEntryProcessor interface.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e25f55c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e25f55c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e25f55c
Branch: refs/heads/ignite-1093-2
Commit: 0e25f55cfa33ae4af7c52bd4e2ee49297ef623f4
Parents: 980a934
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 1 11:41:28 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 1 11:41:28 2015 +0300
----------------------------------------------------------------------
.../processors/platform/PlatformContext.java | 3 +-
.../cache/PlatformCacheEntryProcessor.java | 27 +++
.../cache/PlatformCacheEntryProcessor.java | 220 -------------------
.../cache/PlatformCacheEntryProcessorImpl.java | 220 +++++++++++++++++++
4 files changed, 249 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 4c70360..cea8326 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.portable.PortableRawReaderEx;
import org.apache.ignite.internal.portable.PortableRawWriterEx;
import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx;
import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter;
+import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway;
import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter;
@@ -248,7 +249,7 @@ public interface PlatformContext {
* @param ptr Pointer.
* @return Entry processor.
*/
- public CacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr);
+ public PlatformCacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr);
/**
* Create cache entry filter.
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
new file mode 100644
index 0000000..3d8022f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import org.apache.ignite.cache.CacheEntryProcessor;
+
+/**
+ * Platform cache entry processor marker interface.
+ */
+public interface PlatformCacheEntryProcessor extends CacheEntryProcessor {
+ // No-op.
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
deleted file mode 100644
index fd14632..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheEntryProcessor;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformProcessor;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Interop cache entry processor. Delegates processing to native platform.
- */
-public class PlatformCacheEntryProcessor<K, V, T> implements CacheEntryProcessor<K, V, T>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Indicates that entry has not been modified */
- private static final byte ENTRY_STATE_INTACT = 0;
-
- /** Indicates that entry value has been set */
- private static final byte ENTRY_STATE_VALUE_SET = 1;
-
- /** Indicates that remove has been called on an entry */
- private static final byte ENTRY_STATE_REMOVED = 2;
-
- /** Indicates error in processor that is written as portable. */
- private static final byte ENTRY_STATE_ERR_PORTABLE = 3;
-
- /** Indicates error in processor that is written as string. */
- private static final byte ENTRY_STATE_ERR_STRING = 4;
-
- /** Native portable processor */
- private Object proc;
-
- /** Pointer to processor in the native platform. */
- private transient long ptr;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- public PlatformCacheEntryProcessor() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param proc Native portable processor
- * @param ptr Pointer to processor in the native platform.
- */
- public PlatformCacheEntryProcessor(Object proc, long ptr) {
- this.proc = proc;
- this.ptr = ptr;
- }
-
- /** {@inheritDoc} */
- @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException {
- try {
- IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
-
- PlatformProcessor interopProc;
-
- try {
- interopProc = PlatformUtils.platformProcessor(ignite);
- }
- catch (IllegalStateException ex){
- throw new EntryProcessorException(ex);
- }
-
- interopProc.awaitStart();
-
- return execute0(interopProc.context(), entry);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * Executes interop entry processor on a given entry, updates entry and returns result.
- *
- * @param ctx Context.
- * @param entry Entry.
- * @return Processing result.
- * @throws org.apache.ignite.IgniteCheckedException
- */
- private T execute0(PlatformContext ctx, MutableEntry<K, V> entry)
- throws IgniteCheckedException {
- try (PlatformMemory outMem = ctx.memory().allocate()) {
- PlatformOutputStream out = outMem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writeEntryAndProcessor(entry, writer);
-
- out.synchronize();
-
- try (PlatformMemory inMem = ctx.memory().allocate()) {
- PlatformInputStream in = inMem.input();
-
- ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
-
- in.synchronize();
-
- PortableRawReaderEx reader = ctx.reader(in);
-
- return readResultAndUpdateEntry(ctx, entry, reader);
- }
- }
- }
-
- /**
- * Writes mutable entry and entry processor to the stream.
- *
- * @param entry Entry to process.
- * @param writer Writer.
- */
- private void writeEntryAndProcessor(MutableEntry<K, V> entry, PortableRawWriterEx writer) {
- writer.writeObject(entry.getKey());
- writer.writeObject(entry.getValue());
-
- if (ptr != 0) {
- // Execute locally - we have a pointer to native processor.
- writer.writeBoolean(true);
- writer.writeLong(ptr);
- }
- else {
- // We are on a remote node. Send processor holder back to native.
- writer.writeBoolean(false);
- writer.writeObject(proc);
- }
- }
-
- /**
- * Reads processing result from stream, updates mutable entry accordingly, and returns the result.
- *
- * @param entry Mutable entry to update.
- * @param reader Reader.
- * @return Entry processing result
- * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
- */
- @SuppressWarnings("unchecked")
- private T readResultAndUpdateEntry(PlatformContext ctx, MutableEntry<K, V> entry, PortableRawReaderEx reader) {
- byte state = reader.readByte();
-
- switch (state) {
- case ENTRY_STATE_VALUE_SET:
- entry.setValue((V)reader.readObject());
-
- break;
-
- case ENTRY_STATE_REMOVED:
- entry.remove();
-
- break;
-
- case ENTRY_STATE_ERR_PORTABLE:
- // Full exception
- Object nativeErr = reader.readObjectDetached();
-
- assert nativeErr != null;
-
- throw new EntryProcessorException("Failed to execute native cache entry processor.",
- ctx.createNativeException(nativeErr));
-
- case ENTRY_STATE_ERR_STRING:
- // Native exception was not serializable, we have only message.
- String errMsg = reader.readString();
-
- assert errMsg != null;
-
- throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg);
-
- default:
- assert state == ENTRY_STATE_INTACT;
- }
-
- return (T)reader.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(proc);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- proc = in.readObject();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
new file mode 100644
index 0000000..16124fe
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.portable.PortableRawReaderEx;
+import org.apache.ignite.internal.portable.PortableRawWriterEx;
+import org.apache.ignite.internal.processors.platform.PlatformContext;
+import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
+import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
+import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
+import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Platform cache entry processor. Delegates processing to native platform.
+ */
+public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProcessor, Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Indicates that entry has not been modified */
+ private static final byte ENTRY_STATE_INTACT = 0;
+
+ /** Indicates that entry value has been set */
+ private static final byte ENTRY_STATE_VALUE_SET = 1;
+
+ /** Indicates that remove has been called on an entry */
+ private static final byte ENTRY_STATE_REMOVED = 2;
+
+ /** Indicates error in processor that is written as portable. */
+ private static final byte ENTRY_STATE_ERR_PORTABLE = 3;
+
+ /** Indicates error in processor that is written as string. */
+ private static final byte ENTRY_STATE_ERR_STRING = 4;
+
+ /** Native portable processor */
+ private Object proc;
+
+ /** Pointer to processor in the native platform. */
+ private transient long ptr;
+
+ /**
+ * {@link java.io.Externalizable} support.
+ */
+ public PlatformCacheEntryProcessorImpl() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param proc Native portable processor
+ * @param ptr Pointer to processor in the native platform.
+ */
+ public PlatformCacheEntryProcessorImpl(Object proc, long ptr) {
+ this.proc = proc;
+ this.ptr = ptr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object process(MutableEntry entry, Object... args)
+ throws EntryProcessorException {
+ try {
+ IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
+
+ PlatformProcessor interopProc;
+
+ try {
+ interopProc = PlatformUtils.platformProcessor(ignite);
+ }
+ catch (IllegalStateException ex){
+ throw new EntryProcessorException(ex);
+ }
+
+ interopProc.awaitStart();
+
+ return execute0(interopProc.context(), entry);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /**
+ * Executes interop entry processor on a given entry, updates entry and returns result.
+ *
+ * @param ctx Context.
+ * @param entry Entry.
+ * @return Processing result.
+ * @throws org.apache.ignite.IgniteCheckedException
+ */
+ private Object execute0(PlatformContext ctx, MutableEntry entry)
+ throws IgniteCheckedException {
+ try (PlatformMemory outMem = ctx.memory().allocate()) {
+ PlatformOutputStream out = outMem.output();
+
+ PortableRawWriterEx writer = ctx.writer(out);
+
+ writeEntryAndProcessor(entry, writer);
+
+ out.synchronize();
+
+ try (PlatformMemory inMem = ctx.memory().allocate()) {
+ PlatformInputStream in = inMem.input();
+
+ ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
+
+ in.synchronize();
+
+ PortableRawReaderEx reader = ctx.reader(in);
+
+ return readResultAndUpdateEntry(ctx, entry, reader);
+ }
+ }
+ }
+
+ /**
+ * Writes mutable entry and entry processor to the stream.
+ *
+ * @param entry Entry to process.
+ * @param writer Writer.
+ */
+ private void writeEntryAndProcessor(MutableEntry entry, PortableRawWriterEx writer) {
+ writer.writeObject(entry.getKey());
+ writer.writeObject(entry.getValue());
+
+ if (ptr != 0) {
+ // Execute locally - we have a pointer to native processor.
+ writer.writeBoolean(true);
+ writer.writeLong(ptr);
+ }
+ else {
+ // We are on a remote node. Send processor holder back to native.
+ writer.writeBoolean(false);
+ writer.writeObject(proc);
+ }
+ }
+
+ /**
+ * Reads processing result from stream, updates mutable entry accordingly, and returns the result.
+ *
+ * @param entry Mutable entry to update.
+ * @param reader Reader.
+ * @return Entry processing result
+ * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
+ */
+ @SuppressWarnings("unchecked")
+ private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, PortableRawReaderEx reader) {
+ byte state = reader.readByte();
+
+ switch (state) {
+ case ENTRY_STATE_VALUE_SET:
+ entry.setValue(reader.readObject());
+
+ break;
+
+ case ENTRY_STATE_REMOVED:
+ entry.remove();
+
+ break;
+
+ case ENTRY_STATE_ERR_PORTABLE:
+ // Full exception
+ Object nativeErr = reader.readObjectDetached();
+
+ assert nativeErr != null;
+
+ throw new EntryProcessorException("Failed to execute native cache entry processor.",
+ ctx.createNativeException(nativeErr));
+
+ case ENTRY_STATE_ERR_STRING:
+ // Native exception was not serializable, we have only message.
+ String errMsg = reader.readString();
+
+ assert errMsg != null;
+
+ throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg);
+
+ default:
+ assert state == ENTRY_STATE_INTACT;
+ }
+
+ return reader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(proc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ proc = in.readObject();
+ }
+}
\ No newline at end of file