You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/18 12:04:10 UTC
[06/14] ignite git commit: IGNITE-1513: Merged Java to core module.
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
deleted file mode 100644
index f59a63f..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformProcessor;
-import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Platform cache entry processor. Delegates processing to native platform.
- */
-public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProcessor, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Indicates that entry has not been modified */
- private static final byte ENTRY_STATE_INTACT = 0;
-
- /** Indicates that entry value has been set */
- private static final byte ENTRY_STATE_VALUE_SET = 1;
-
- /** Indicates that remove has been called on an entry */
- private static final byte ENTRY_STATE_REMOVED = 2;
-
- /** Indicates error in processor that is written as portable. */
- private static final byte ENTRY_STATE_ERR_PORTABLE = 3;
-
- /** Indicates error in processor that is written as string. */
- private static final byte ENTRY_STATE_ERR_STRING = 4;
-
- /** Native portable processor */
- private Object proc;
-
- /** Pointer to processor in the native platform. */
- private transient long ptr;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- public PlatformCacheEntryProcessorImpl() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param proc Native portable processor
- * @param ptr Pointer to processor in the native platform.
- */
- public PlatformCacheEntryProcessorImpl(Object proc, long ptr) {
- this.proc = proc;
- this.ptr = ptr;
- }
-
- /** {@inheritDoc} */
- @Override public Object process(MutableEntry entry, Object... args)
- throws EntryProcessorException {
- try {
- IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class);
-
- PlatformProcessor interopProc;
-
- try {
- interopProc = PlatformUtils.platformProcessor(ignite);
- }
- catch (IllegalStateException ex){
- throw new EntryProcessorException(ex);
- }
-
- interopProc.awaitStart();
-
- return execute0(interopProc.context(), entry);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * Executes interop entry processor on a given entry, updates entry and returns result.
- *
- * @param ctx Context.
- * @param entry Entry.
- * @return Processing result.
- * @throws org.apache.ignite.IgniteCheckedException
- */
- private Object execute0(PlatformContext ctx, MutableEntry entry)
- throws IgniteCheckedException {
- try (PlatformMemory outMem = ctx.memory().allocate()) {
- PlatformOutputStream out = outMem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writeEntryAndProcessor(entry, writer);
-
- out.synchronize();
-
- try (PlatformMemory inMem = ctx.memory().allocate()) {
- PlatformInputStream in = inMem.input();
-
- ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer());
-
- in.synchronize();
-
- PortableRawReaderEx reader = ctx.reader(in);
-
- return readResultAndUpdateEntry(ctx, entry, reader);
- }
- }
- }
-
- /**
- * Writes mutable entry and entry processor to the stream.
- *
- * @param entry Entry to process.
- * @param writer Writer.
- */
- private void writeEntryAndProcessor(MutableEntry entry, PortableRawWriterEx writer) {
- writer.writeObject(entry.getKey());
- writer.writeObject(entry.getValue());
-
- if (ptr != 0) {
- // Execute locally - we have a pointer to native processor.
- writer.writeBoolean(true);
- writer.writeLong(ptr);
- }
- else {
- // We are on a remote node. Send processor holder back to native.
- writer.writeBoolean(false);
- writer.writeObject(proc);
- }
- }
-
- /**
- * Reads processing result from stream, updates mutable entry accordingly, and returns the result.
- *
- * @param entry Mutable entry to update.
- * @param reader Reader.
- * @return Entry processing result
- * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code.
- */
- @SuppressWarnings("unchecked")
- private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, PortableRawReaderEx reader) {
- byte state = reader.readByte();
-
- switch (state) {
- case ENTRY_STATE_VALUE_SET:
- entry.setValue(reader.readObjectDetached());
-
- break;
-
- case ENTRY_STATE_REMOVED:
- entry.remove();
-
- break;
-
- case ENTRY_STATE_ERR_PORTABLE:
- // Full exception
- Object nativeErr = reader.readObjectDetached();
-
- assert nativeErr != null;
-
- throw new EntryProcessorException("Failed to execute native cache entry processor.",
- ctx.createNativeException(nativeErr));
-
- case ENTRY_STATE_ERR_STRING:
- // Native exception was not serializable, we have only message.
- String errMsg = reader.readString();
-
- assert errMsg != null;
-
- throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg);
-
- default:
- assert state == ENTRY_STATE_INTACT;
- }
-
- return reader.readObjectDetached();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(proc);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- proc = in.readObject();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
deleted file mode 100644
index 78ca683..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheIterator.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache;
-
-import java.util.Iterator;
-import javax.cache.Cache;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-
-/**
- * Interop cache iterator.
- */
-public class PlatformCacheIterator extends PlatformAbstractTarget {
- /** Operation: next entry. */
- private static final int OP_NEXT = 1;
-
- /** Iterator. */
- private final Iterator<Cache.Entry> iter;
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param iter Iterator.
- */
- public PlatformCacheIterator(PlatformContext platformCtx, Iterator<Cache.Entry> iter) {
- super(platformCtx);
-
- this.iter = iter;
- }
-
- /** {@inheritDoc} */
- @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
- switch (type) {
- case OP_NEXT:
- if (iter.hasNext()) {
- Cache.Entry e = iter.next();
-
- assert e != null;
-
- writer.writeBoolean(true);
-
- writer.writeObjectDetached(e.getKey());
- writer.writeObjectDetached(e.getValue());
- }
- else
- writer.writeBoolean(false);
-
- break;
-
- default:
- super.processOutStream(type, writer);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
deleted file mode 100644
index ef17a06..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCachePartialUpdateException.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache;
-
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-import java.util.Collection;
-
-/**
- * Interop cache partial update exception.
- */
-public class PlatformCachePartialUpdateException extends PlatformExtendedException {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Keep portable flag. */
- private final boolean keepPortable;
-
- /**
- * Constructor.
- *
- * @param cause Root cause.
- * @param ctx Context.
- * @param keepPortable Keep portable flag.
- */
- public PlatformCachePartialUpdateException(CachePartialUpdateCheckedException cause, PlatformContext ctx,
- boolean keepPortable) {
- super(cause, ctx);
- this.keepPortable = keepPortable;
- }
-
- /** {@inheritDoc} */
- @Override public void writeData(PortableRawWriterEx writer) {
- Collection keys = ((CachePartialUpdateCheckedException)getCause()).failedKeys();
-
- writer.writeBoolean(keepPortable);
-
- PlatformUtils.writeNullableCollection(writer, keys);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
deleted file mode 100644
index 9dd7416..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinity.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.affinity;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Native cache wrapper implementation.
- */
-@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"})
-public class PlatformAffinity extends PlatformAbstractTarget {
- /** */
- public static final int OP_AFFINITY_KEY = 1;
-
- /** */
- public static final int OP_ALL_PARTITIONS = 2;
-
- /** */
- public static final int OP_BACKUP_PARTITIONS = 3;
-
- /** */
- public static final int OP_IS_BACKUP = 4;
-
- /** */
- public static final int OP_IS_PRIMARY = 5;
-
- /** */
- public static final int OP_IS_PRIMARY_OR_BACKUP = 6;
-
- /** */
- public static final int OP_MAP_KEY_TO_NODE = 7;
-
- /** */
- public static final int OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS = 8;
-
- /** */
- public static final int OP_MAP_KEYS_TO_NODES = 9;
-
- /** */
- public static final int OP_MAP_PARTITION_TO_NODE = 10;
-
- /** */
- public static final int OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS = 11;
-
- /** */
- public static final int OP_MAP_PARTITIONS_TO_NODES = 12;
-
- /** */
- public static final int OP_PARTITION = 13;
-
- /** */
- public static final int OP_PRIMARY_PARTITIONS = 14;
-
- /** */
- private static final C1<ClusterNode, UUID> TO_NODE_ID = new C1<ClusterNode, UUID>() {
- @Nullable @Override public UUID apply(ClusterNode node) {
- return node != null ? node.id() : null;
- }
- };
-
- /** Underlying cache affinity. */
- private final Affinity<Object> aff;
-
- /** Discovery manager */
- private final GridDiscoveryManager discovery;
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param igniteCtx Ignite context.
- * @param name Cache name.
- */
- public PlatformAffinity(PlatformContext platformCtx, GridKernalContext igniteCtx, @Nullable String name)
- throws IgniteCheckedException {
- super(platformCtx);
-
- this.aff = igniteCtx.grid().affinity(name);
-
- if (aff == null)
- throw new IgniteCheckedException("Cache with the given name doesn't exist: " + name);
-
- discovery = igniteCtx.discovery();
- }
-
- /** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_PARTITION:
- return aff.partition(reader.readObjectDetached());
-
- case OP_IS_PRIMARY: {
- UUID nodeId = reader.readUuid();
-
- Object key = reader.readObjectDetached();
-
- ClusterNode node = discovery.node(nodeId);
-
- if (node == null)
- return FALSE;
-
- return aff.isPrimary(node, key) ? TRUE : FALSE;
- }
-
- case OP_IS_BACKUP: {
- UUID nodeId = reader.readUuid();
-
- Object key = reader.readObjectDetached();
-
- ClusterNode node = discovery.node(nodeId);
-
- if (node == null)
- return FALSE;
-
- return aff.isBackup(node, key) ? TRUE : FALSE;
- }
-
- case OP_IS_PRIMARY_OR_BACKUP: {
- UUID nodeId = reader.readUuid();
-
- Object key = reader.readObjectDetached();
-
- ClusterNode node = discovery.node(nodeId);
-
- if (node == null)
- return FALSE;
-
- return aff.isPrimaryOrBackup(node, key) ? TRUE : FALSE;
- }
-
- default:
- return super.processInStreamOutLong(type, reader);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"})
- @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
- throws IgniteCheckedException {
- switch (type) {
- case OP_PRIMARY_PARTITIONS: {
- UUID nodeId = reader.readObject();
-
- ClusterNode node = discovery.node(nodeId);
-
- int[] parts = node != null ? aff.primaryPartitions(node) : U.EMPTY_INTS;
-
- writer.writeIntArray(parts);
-
- break;
- }
-
- case OP_BACKUP_PARTITIONS: {
- UUID nodeId = reader.readObject();
-
- ClusterNode node = discovery.node(nodeId);
-
- int[] parts = node != null ? aff.backupPartitions(node) : U.EMPTY_INTS;
-
- writer.writeIntArray(parts);
-
- break;
- }
-
- case OP_ALL_PARTITIONS: {
- UUID nodeId = reader.readObject();
-
- ClusterNode node = discovery.node(nodeId);
-
- int[] parts = node != null ? aff.allPartitions(node) : U.EMPTY_INTS;
-
- writer.writeIntArray(parts);
-
- break;
- }
-
- case OP_AFFINITY_KEY: {
- Object key = reader.readObjectDetached();
-
- writer.writeObject(aff.affinityKey(key));
-
- break;
- }
-
- case OP_MAP_KEY_TO_NODE: {
- Object key = reader.readObjectDetached();
-
- ClusterNode node = aff.mapKeyToNode(key);
-
- platformCtx.writeNode(writer, node);
-
- break;
- }
-
- case OP_MAP_PARTITION_TO_NODE: {
- int part = reader.readObject();
-
- ClusterNode node = aff.mapPartitionToNode(part);
-
- platformCtx.writeNode(writer, node);
-
- break;
- }
-
- case OP_MAP_KEY_TO_PRIMARY_AND_BACKUPS: {
- Object key = reader.readObjectDetached();
-
- platformCtx.writeNodes(writer, aff.mapKeyToPrimaryAndBackups(key));
-
- break;
- }
-
- case OP_MAP_PARTITION_TO_PRIMARY_AND_BACKUPS: {
- int part = reader.readObject();
-
- platformCtx.writeNodes(writer, aff.mapPartitionToPrimaryAndBackups(part));
-
- break;
- }
-
- case OP_MAP_KEYS_TO_NODES: {
- Collection<Object> keys = reader.readCollection();
-
- Map<ClusterNode, Collection<Object>> map = aff.mapKeysToNodes(keys);
-
- writer.writeInt(map.size());
-
- for (Map.Entry<ClusterNode, Collection<Object>> e : map.entrySet()) {
- platformCtx.addNode(e.getKey());
-
- writer.writeUuid(e.getKey().id());
- writer.writeObject(e.getValue());
- }
-
- break;
- }
-
- case OP_MAP_PARTITIONS_TO_NODES: {
- Collection<Integer> parts = reader.readCollection();
-
- Map<Integer, ClusterNode> map = aff.mapPartitionsToNodes(parts);
-
- writer.writeInt(map.size());
-
- for (Map.Entry<Integer, ClusterNode> e : map.entrySet()) {
- platformCtx.addNode(e.getValue());
-
- writer.writeInt(e.getKey());
-
- writer.writeUuid(e.getValue().id());
- }
-
- break;
- }
-
- default:
- super.processInStreamOutStream(type, reader, writer);
- }
- }
-
- /**
- * @return Gets number of partitions in cache.
- */
- public int partitions() {
- return aff.partitions();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
deleted file mode 100644
index 6c2c873..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformAbstractQueryCursor.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.query;
-
-import java.util.Iterator;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-/**
- *
- */
-public abstract class PlatformAbstractQueryCursor<T> extends PlatformAbstractTarget implements AutoCloseable {
- /** Get multiple entries. */
- private static final int OP_GET_ALL = 1;
-
- /** Get all entries. */
- private static final int OP_GET_BATCH = 2;
-
- /** Get single entry. */
- private static final int OP_GET_SINGLE = 3;
-
- /** Underlying cursor. */
- private final QueryCursorEx<T> cursor;
-
- /** Batch size size. */
- private final int batchSize;
-
- /** Underlying iterator. */
- private Iterator<T> iter;
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param cursor Underlying cursor.
- * @param batchSize Batch size.
- */
- public PlatformAbstractQueryCursor(PlatformContext platformCtx, QueryCursorEx<T> cursor, int batchSize) {
- super(platformCtx);
-
- this.cursor = cursor;
- this.batchSize = batchSize;
- }
-
- /** {@inheritDoc} */
- @Override protected void processOutStream(int type, final PortableRawWriterEx writer) throws IgniteCheckedException {
- switch (type) {
- case OP_GET_BATCH: {
- assert iter != null : "iterator() has not been called";
-
- try {
- int cntPos = writer.reserveInt();
-
- int cnt;
-
- for (cnt = 0; cnt < batchSize; cnt++) {
- if (iter.hasNext())
- write(writer, iter.next());
- else
- break;
- }
-
- writer.writeInt(cntPos, cnt);
- }
- catch (Exception err) {
- throw PlatformUtils.unwrapQueryException(err);
- }
-
- break;
- }
-
- case OP_GET_SINGLE: {
- assert iter != null : "iterator() has not been called";
-
- try {
- if (iter.hasNext()) {
- write(writer, iter.next());
-
- return;
- }
- }
- catch (Exception err) {
- throw PlatformUtils.unwrapQueryException(err);
- }
-
- throw new IgniteCheckedException("No more data available.");
- }
-
- case OP_GET_ALL: {
- try {
- int pos = writer.reserveInt();
-
- Consumer<T> consumer = new Consumer<>(this, writer);
-
- cursor.getAll(consumer);
-
- writer.writeInt(pos, consumer.cnt);
- }
- catch (Exception err) {
- throw PlatformUtils.unwrapQueryException(err);
- }
-
- break;
- }
-
- default:
- super.processOutStream(type, writer);
- }
- }
-
- /**
- * Get cursor iterator.
- */
- public void iterator() {
- iter = cursor.iterator();
- }
-
- /**
- * Check whether next iterator entry exists.
- *
- * @return {@code True} if exists.
- */
- @SuppressWarnings("UnusedDeclaration")
- public boolean iteratorHasNext() {
- assert iter != null : "iterator() has not been called";
-
- return iter.hasNext();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws Exception {
- cursor.close();
- }
-
- /**
- * Write value to the stream. Extension point to perform conversions on the object before writing it.
- *
- * @param writer Writer.
- * @param val Value.
- */
- protected abstract void write(PortableRawWriterEx writer, T val);
-
- /**
- * Query cursor consumer.
- */
- private static class Consumer<T> implements QueryCursorEx.Consumer<T> {
- /** Current query cursor. */
- private final PlatformAbstractQueryCursor<T> cursor;
-
- /** Writer. */
- private final PortableRawWriterEx writer;
-
- /** Count. */
- private int cnt;
-
- /**
- * Constructor.
- *
- * @param writer Writer.
- */
- public Consumer(PlatformAbstractQueryCursor<T> cursor, PortableRawWriterEx writer) {
- this.cursor = cursor;
- this.writer = writer;
- }
-
- /** {@inheritDoc} */
- @Override public void consume(T val) throws IgniteCheckedException {
- cursor.write(writer, val);
-
- cnt++;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
deleted file mode 100644
index 453e233..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryImpl.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.query;
-
-import java.io.ObjectStreamException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.cache.Cache;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.query.ContinuousQuery;
-import org.apache.ignite.cache.query.Query;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformTarget;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-
-/**
- * Interop continuous query handle.
- */
-public class PlatformContinuousQueryImpl implements PlatformContinuousQuery {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Context. */
- protected final PlatformContext platformCtx;
-
- /** Whether filter exists. */
- private final boolean hasFilter;
-
- /** Native filter in serialized form. If null, then filter is either not set, or this is local query. */
- protected final Object filter;
-
- /** Pointer to native counterpart; zero if closed. */
- private long ptr;
-
- /** Cursor to handle filter close. */
- private QueryCursor cursor;
-
- /** Lock for concurrency control. */
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /** Wrapped initial qry cursor. */
- private PlatformQueryCursor initialQryCur;
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param ptr Pointer to native counterpart.
- * @param hasFilter Whether filter exists.
- * @param filter Filter.
- */
- public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) {
- assert ptr != 0L;
-
- this.platformCtx = platformCtx;
- this.ptr = ptr;
- this.hasFilter = hasFilter;
- this.filter = filter;
- }
-
- /**
- * Start query execution.
- *
- * @param cache Cache.
- * @param loc Local flag.
- * @param bufSize Buffer size.
- * @param timeInterval Time interval.
- * @param autoUnsubscribe Auto-unsubscribe flag.
- * @param initialQry Initial query.
- */
- @SuppressWarnings("unchecked")
- public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe,
- Query initialQry) throws IgniteCheckedException {
- assert !loc || filter == null;
-
- lock.writeLock().lock();
-
- try {
- try {
- ContinuousQuery qry = new ContinuousQuery();
-
- qry.setLocalListener(this);
- qry.setRemoteFilter(this); // Filter must be set always for correct resource release.
- qry.setPageSize(bufSize);
- qry.setTimeInterval(timeInterval);
- qry.setAutoUnsubscribe(autoUnsubscribe);
- qry.setInitialQuery(initialQry);
-
- cursor = cache.query(qry.setLocal(loc));
-
- if (initialQry != null)
- initialQryCur = new PlatformQueryCursor(platformCtx, new QueryCursorEx<Cache.Entry>() {
- @Override public Iterator<Cache.Entry> iterator() {
- return cursor.iterator();
- }
-
- @Override public List<Cache.Entry> getAll() {
- return cursor.getAll();
- }
-
- @Override public void close() {
- // No-op: do not close whole continuous query when initial query cursor closes.
- }
-
- @Override public void getAll(Consumer<Cache.Entry> clo) throws IgniteCheckedException {
- for (Cache.Entry t : this)
- clo.consume(t);
- }
-
- @Override public List<GridQueryFieldMetadata> fieldsMeta() {
- return null;
- }
- }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : Query.DFLT_PAGE_SIZE);
- }
- catch (Exception e) {
- try
- {
- close0();
- }
- catch (Exception ignored)
- {
- // Ignore
- }
-
- throw PlatformUtils.unwrapQueryException(e);
- }
- }
- finally {
- lock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void onUpdated(Iterable evts) throws CacheEntryListenerException {
- lock.readLock().lock();
-
- try {
- if (ptr == 0)
- throw new CacheEntryListenerException("Failed to notify listener because it has been closed.");
-
- PlatformUtils.applyContinuousQueryEvents(platformCtx, ptr, evts);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
- lock.readLock().lock();
-
- try {
- if (ptr == 0)
- throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
-
- return !hasFilter || PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onQueryUnregister() {
- close();
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- lock.writeLock().lock();
-
- try {
- close0();
- }
- finally {
- lock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"UnusedDeclaration", "unchecked"})
- @Override public PlatformTarget getInitialQueryCursor() {
- return initialQryCur;
- }
-
- /**
- * Internal close routine.
- */
- private void close0() {
- if (ptr != 0) {
- long ptr0 = ptr;
-
- ptr = 0;
-
- if (cursor != null)
- cursor.close();
-
- platformCtx.gateway().continuousQueryFilterRelease(ptr0);
- }
- }
-
- /**
- * Replacer for remote filter.
- *
- * @return Filter to be deployed on remote node.
- * @throws ObjectStreamException If failed.
- */
- Object writeReplace() throws ObjectStreamException {
- return filter == null ? null : platformCtx.createContinuousQueryFilter(filter);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
deleted file mode 100644
index 71aa38c..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformContinuousQueryRemoteFilter.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.query;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * Continuous query filter deployed on remote nodes.
- */
-public class PlatformContinuousQueryRemoteFilter implements PlatformContinuousQueryFilter, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Lock for concurrency control. */
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /** Native filter in serialized form. */
- private Object filter;
-
- /** Grid hosting the filter. */
- @IgniteInstanceResource
- private transient Ignite grid;
-
- /** Native platform pointer. */
- private transient volatile long ptr;
-
- /** Close flag. Once set, none requests to native platform is possible. */
- private transient boolean closed;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- public PlatformContinuousQueryRemoteFilter() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param filter Serialized native filter.
- */
- public PlatformContinuousQueryRemoteFilter(Object filter) {
- assert filter != null;
-
- this.filter = filter;
- }
-
- /** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
- long ptr0 = ptr;
-
- if (ptr0 == 0)
- deploy();
-
- lock.readLock().lock();
-
- try {
- if (closed)
- throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
-
- PlatformContext platformCtx = PlatformUtils.platformContext(grid);
-
- return PlatformUtils.evaluateContinuousQueryEvent(platformCtx, ptr, evt);
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /**
- * Deploy filter to native platform.
- */
- private void deploy() {
- lock.writeLock().lock();
-
- try {
- // 1. Do not deploy if the filter has been closed concurrently.
- if (closed)
- throw new CacheEntryListenerException("Failed to deploy the filter because it has been closed.");
-
- // 2. Deploy.
- PlatformContext ctx = PlatformUtils.platformContext(grid);
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(filter);
-
- out.synchronize();
-
- ptr = ctx.gateway().continuousQueryFilterCreate(mem.pointer());
- }
- catch (Exception e) {
- // 3. Close in case of failure.
- close();
-
- throw new CacheEntryListenerException("Failed to deploy the filter.", e);
- }
- }
- finally {
- lock.writeLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onQueryUnregister() {
- lock.writeLock().lock();
-
- try {
- close();
- }
- finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Close the filter.
- */
- private void close() {
- if (!closed) {
- try {
- if (ptr != 0) {
- try {
- PlatformUtils.platformContext(grid).gateway().continuousQueryFilterRelease(ptr);
- }
- finally {
- // Nullify the pointer in any case.
- ptr = 0;
- }
- }
- }
- finally {
- closed = true;
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(filter);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- filter = in.readObject();
-
- assert filter != null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PlatformContinuousQueryRemoteFilter.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
deleted file mode 100644
index 44a4f14..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformFieldsQueryCursor.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.query;
-
-import java.util.List;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-
-/**
- * Interop cursor for fields query.
- */
-public class PlatformFieldsQueryCursor extends PlatformAbstractQueryCursor<List<?>> {
- /**
- * Constructor.
- *
- * @param platformCtx Platform context.
- * @param cursor Backing cursor.
- * @param batchSize Batch size.
- */
- public PlatformFieldsQueryCursor(PlatformContext platformCtx, QueryCursorEx<List<?>> cursor, int batchSize) {
- super(platformCtx, cursor, batchSize);
- }
-
- /** {@inheritDoc} */
- @Override protected void write(PortableRawWriterEx writer, List vals) {
- assert vals != null;
-
- writer.writeInt(vals.size());
-
- for (Object val : vals)
- writer.writeObjectDetached(val);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
deleted file mode 100644
index 410e4de..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/query/PlatformQueryCursor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.query;
-
-import javax.cache.Cache;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-
-/**
- * Interop cursor for regular queries.
- */
-public class PlatformQueryCursor extends PlatformAbstractQueryCursor<Cache.Entry> {
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param cursor Backing cursor.
- * @param batchSize Batch size.
- */
- public PlatformQueryCursor(PlatformContext platformCtx, QueryCursorEx<Cache.Entry> cursor, int batchSize) {
- super(platformCtx, cursor, batchSize);
- }
-
- /** {@inheritDoc} */
- @Override protected void write(PortableRawWriterEx writer, Cache.Entry val) {
- writer.writeObjectDetached(val.getKey());
- writer.writeObjectDetached(val.getValue());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
deleted file mode 100644
index a741f0f..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/store/PlatformCacheStoreCallback.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cache.store;
-
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-
-/**
- * Platform cache store callback.
- */
-public abstract class PlatformCacheStoreCallback {
- /** Context. */
- protected final PlatformContext ctx;
-
- /**
- * Constructor.
- *
- * @param ctx Context.
- */
- protected PlatformCacheStoreCallback(PlatformContext ctx) {
- this.ctx = ctx;
- }
-
- /**
- * Invoke the callback.
- *
- * @param memPtr Memory pointer.
- */
- public void invoke(long memPtr) {
- if (memPtr > 0) {
- try (PlatformMemory mem = ctx.memory().get(memPtr)) {
- PortableRawReaderEx reader = ctx.reader(mem);
-
- invoke0(reader);
- }
- }
- }
-
- /**
- * Internal invoke routine.
- *
- * @param reader Reader.
- */
- protected abstract void invoke0(PortableRawReaderEx reader);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
deleted file mode 100644
index a1c8516..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cluster;
-
-import java.util.Collection;
-import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteCluster;
-import org.apache.ignite.cluster.ClusterMetrics;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.cluster.ClusterGroupEx;
-import org.apache.ignite.internal.portable.PortableRawReaderEx;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-
-/**
- * Interop projection.
- */
-@SuppressWarnings({"UnusedDeclaration"})
-public class PlatformClusterGroup extends PlatformAbstractTarget {
- /** */
- private static final int OP_ALL_METADATA = 1;
-
- /** */
- private static final int OP_FOR_ATTRIBUTE = 2;
-
- /** */
- private static final int OP_FOR_CACHE = 3;
-
- /** */
- private static final int OP_FOR_CLIENT = 4;
-
- /** */
- private static final int OP_FOR_DATA = 5;
-
- /** */
- private static final int OP_FOR_HOST = 6;
-
- /** */
- private static final int OP_FOR_NODE_IDS = 7;
-
- /** */
- private static final int OP_METADATA = 8;
-
- /** */
- private static final int OP_METRICS = 9;
-
- /** */
- private static final int OP_METRICS_FILTERED = 10;
-
- /** */
- private static final int OP_NODE_METRICS = 11;
-
- /** */
- private static final int OP_NODES = 12;
-
- /** */
- private static final int OP_PING_NODE = 13;
-
- /** */
- private static final int OP_TOPOLOGY = 14;
-
- /** Projection. */
- private final ClusterGroupEx prj;
-
- /**
- * Constructor.
- *
- * @param platformCtx Context.
- * @param prj Projection.
- */
- public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
- super(platformCtx);
-
- this.prj = prj;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
- switch (type) {
- case OP_METRICS:
- platformCtx.writeClusterMetrics(writer, prj.metrics());
-
- break;
-
- case OP_ALL_METADATA:
- platformCtx.writeAllMetadata(writer);
-
- break;
-
- default:
- super.processOutStream(type, writer);
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"ConstantConditions", "deprecation"})
- @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer)
- throws IgniteCheckedException {
- switch (type) {
- case OP_METRICS_FILTERED: {
- Collection<UUID> ids = PlatformUtils.readCollection(reader);
-
- platformCtx.writeClusterMetrics(writer, prj.forNodeIds(ids).metrics());
-
- break;
- }
-
- case OP_NODES: {
- long oldTopVer = reader.readLong();
-
- long curTopVer = platformCtx.kernalContext().discovery().topologyVersion();
-
- if (curTopVer > oldTopVer) {
- writer.writeBoolean(true);
-
- writer.writeLong(curTopVer);
-
- // At this moment topology version might have advanced, and due to this race
- // we return outdated top ver to the callee. But this race is benign, the only
- // possible side effect is that the user will re-request nodes and we will return
- // the same set of nodes but with more recent topology version.
- Collection<ClusterNode> nodes = prj.nodes();
-
- platformCtx.writeNodes(writer, nodes);
- }
- else
- // No discovery events since last invocation.
- writer.writeBoolean(false);
-
- break;
- }
-
- case OP_NODE_METRICS: {
- UUID nodeId = reader.readUuid();
-
- long lastUpdateTime = reader.readLong();
-
- // Ask discovery because node might have been filtered out of current projection.
- ClusterNode node = platformCtx.kernalContext().discovery().node(nodeId);
-
- ClusterMetrics metrics = null;
-
- if (node != null) {
- ClusterMetrics metrics0 = node.metrics();
-
- long triggerTime = lastUpdateTime + platformCtx.kernalContext().config().getMetricsUpdateFrequency();
-
- metrics = metrics0.getLastUpdateTime() > triggerTime ? metrics0 : null;
- }
-
- platformCtx.writeClusterMetrics(writer, metrics);
-
- break;
- }
-
- case OP_METADATA: {
- int typeId = reader.readInt();
-
- platformCtx.writeMetadata(writer, typeId);
-
- break;
- }
-
- case OP_TOPOLOGY: {
- long topVer = reader.readLong();
-
- platformCtx.writeNodes(writer, topology(topVer));
-
- break;
- }
-
- default:
- super.processInStreamOutStream(type, reader, writer);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_PING_NODE:
- return pingNode(reader.readUuid()) ? TRUE : FALSE;
-
- default:
- return super.processInStreamOutLong(type, reader);
- }
- }
-
- /** {@inheritDoc} */
- @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
- switch (type) {
- case OP_FOR_NODE_IDS: {
- Collection<UUID> ids = PlatformUtils.readCollection(reader);
-
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forNodeIds(ids));
- }
-
- case OP_FOR_ATTRIBUTE:
- return new PlatformClusterGroup(platformCtx,
- (ClusterGroupEx)prj.forAttribute(reader.readString(), reader.readString()));
-
- case OP_FOR_CACHE: {
- String cacheName = reader.readString();
-
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forCacheNodes(cacheName));
- }
-
- case OP_FOR_CLIENT: {
- String cacheName = reader.readString();
-
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forClientNodes(cacheName));
- }
-
- case OP_FOR_DATA: {
- String cacheName = reader.readString();
-
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDataNodes(cacheName));
- }
-
- case OP_FOR_HOST: {
- UUID nodeId = reader.readUuid();
-
- ClusterNode node = prj.node(nodeId);
-
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx) prj.forHost(node));
- }
-
- default:
- return super.processInStreamOutObject(type, reader);
- }
- }
-
- /**
- * @param exclude Projection to exclude.
- * @return New projection.
- */
- public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) {
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
- }
-
- /**
- * @return New projection.
- */
- public PlatformClusterGroup forRemotes() {
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
- }
-
- /**
- * @return New projection.
- */
- public PlatformClusterGroup forDaemons() {
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
- }
-
- /**
- * @return New projection.
- */
- public PlatformClusterGroup forRandom() {
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
- }
-
- /**
- * @return New projection.
- */
- public PlatformClusterGroup forOldest() {
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
- }
-
- /**
- * @return New projection.
- */
- public PlatformClusterGroup forYoungest() {
- return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
- }
-
- /**
- * @return Projection.
- */
- public ClusterGroupEx projection() {
- return prj;
- }
-
- /**
- * Resets local I/O, job, and task execution metrics.
- */
- public void resetMetrics() {
- assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
-
- ((IgniteCluster)prj).resetMetrics();
- }
-
- /**
- * Pings a remote node.
- */
- private boolean pingNode(UUID nodeId) {
- assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
-
- return ((IgniteCluster)prj).pingNode(nodeId);
- }
-
- /**
- * Gets a topology by version. Returns {@code null} if topology history storage doesn't contain
- * specified topology version (history currently keeps last {@code 1000} snapshots).
- *
- * @param topVer Topology version.
- * @return Collection of grid nodes which represented by specified topology version,
- * if it is present in history storage, {@code null} otherwise.
- * @throws UnsupportedOperationException If underlying SPI implementation does not support
- * topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
- * supports topology history.
- */
- private Collection<ClusterNode> topology(long topVer) {
- assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
-
- return ((IgniteCluster)prj).topology(topVer);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
deleted file mode 100644
index 5ba9a85..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilterImpl.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.cluster;
-
-import org.apache.ignite.Ignite;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.resources.IgniteInstanceResource;
-
-/**
- * Interop cluster node filter.
- */
-public class PlatformClusterNodeFilterImpl extends PlatformAbstractPredicate implements PlatformClusterNodeFilter {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- public PlatformClusterNodeFilterImpl() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param pred .Net portable predicate.
- * @param ctx Kernal context.
- */
- public PlatformClusterNodeFilterImpl(Object pred, PlatformContext ctx) {
- super(pred, 0, ctx);
- }
-
- /** {@inheritDoc} */
- @Override public boolean apply(ClusterNode clusterNode) {
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(pred);
- ctx.writeNode(writer, clusterNode);
-
- out.synchronize();
-
- return ctx.gateway().clusterNodeFilterApply(mem.pointer()) != 0;
- }
- }
-
- /**
- * @param ignite Ignite instance.
- */
- @SuppressWarnings("UnusedDeclaration")
- @IgniteInstanceResource
- public void setIgniteInstance(Ignite ignite) {
- ctx = PlatformUtils.platformContext(ignite);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
deleted file mode 100644
index bf9d9e4..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractJob.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.io.Externalizable;
-import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformProcessor;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Base interop job.
- */
-public abstract class PlatformAbstractJob implements PlatformJob, Externalizable {
- /** Marker object denoting the job execution result is stored in native platform. */
- static final Object LOC_JOB_RES = new Object();
-
- /** Grid name. */
- @IgniteInstanceResource
- protected transient Ignite ignite;
-
- /** Parent task; present only on local job instance. */
- protected transient PlatformAbstractTask task;
-
- /** Pointer to job in the native platform. */
- protected transient long ptr;
-
- /** Job. */
- protected Object job;
-
- /**
- * {@link java.io.Externalizable} support.
- */
- protected PlatformAbstractJob() {
- // No-op.
- }
-
- /**
- * Constructor.
- *
- * @param task Parent task.
- * @param ptr Pointer.
- * @param job Job.
- */
- protected PlatformAbstractJob(PlatformAbstractTask task, long ptr, Object job) {
- this.task = task;
- this.ptr = ptr;
- this.job = job;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Object execute() {
- try {
- PlatformProcessor interopProc = PlatformUtils.platformProcessor(ignite);
-
- interopProc.awaitStart();
-
- return execute0(interopProc.context());
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /**
- * Internal job execution routine.
- *
- * @param ctx Interop processor.
- * @return Result.
- * @throws org.apache.ignite.IgniteCheckedException If failed.
- */
- protected abstract Object execute0(PlatformContext ctx) throws IgniteCheckedException;
-
- /**
- * Create job in native platform if needed.
- *
- * @param ctx Context.
- * @return {@code True} if job was created, {@code false} if this is local job and creation is not necessary.
- * @throws org.apache.ignite.IgniteCheckedException If failed.
- */
- protected boolean createJob(PlatformContext ctx) throws IgniteCheckedException {
- if (ptr == 0) {
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeObject(job);
-
- out.synchronize();
-
- ptr = ctx.gateway().computeJobCreate(mem.pointer());
- }
-
- return true;
- }
- else
- return false;
- }
-
- /**
- * Run local job.
- *
- * @param ctx Context.
- * @param cancel Cancel flag.
- * @return Result.
- */
- protected Object runLocal(PlatformContext ctx, boolean cancel) {
- // Local job, must execute it with respect to possible concurrent task completion.
- if (task.onJobLock()) {
- try {
- ctx.gateway().computeJobExecute(ptr, cancel ? 1 : 0, 0);
-
- return LOC_JOB_RES;
- }
- finally {
- task.onJobUnlock();
- }
- }
- else
- // Task has completed concurrently, no need to run the job.
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long pointer() {
- return ptr;
- }
-
- /** {@inheritDoc} */
- @Override public Object job() {
- return job;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
deleted file mode 100644
index b17dd97..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformAbstractTask.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.List;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.compute.ComputeJobResult;
-import org.apache.ignite.compute.ComputeJobResultPolicy;
-import org.apache.ignite.compute.ComputeTask;
-import org.apache.ignite.internal.portable.PortableRawWriterEx;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.processors.platform.PlatformNativeException;
-import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
-import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
-import org.apache.ignite.internal.util.typedef.X;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Base class for all interop tasks.
- */
-public abstract class PlatformAbstractTask implements ComputeTask<Object, Void> {
- /** Platform context. */
- protected final PlatformContext ctx;
-
- /** Pointer to the task in the native platform. */
- protected final long taskPtr;
-
- /** Lock for safe access to native pointers. */
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- /** Done flag. */
- protected boolean done;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param taskPtr Task pointer.
- */
- protected PlatformAbstractTask(PlatformContext ctx, long taskPtr) {
- this.ctx = ctx;
- this.taskPtr = taskPtr;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"ThrowableResultOfMethodCallIgnored", "unchecked"})
- @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
- assert rcvd.isEmpty() : "Should not cache result in Java for interop task";
-
- int plc;
-
- lock.readLock().lock();
-
- try {
- assert !done;
-
- PlatformAbstractJob job = res.getJob();
-
- assert job.pointer() != 0;
-
- Object res0bj = res.getData();
-
- if (res0bj == PlatformAbstractJob.LOC_JOB_RES)
- // Processing local job execution result.
- plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), 0);
- else {
- // Processing remote job execution result or exception.
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- writer.writeUuid(res.getNode().id());
- writer.writeBoolean(res.isCancelled());
-
- IgniteException err = res.getException();
-
- PlatformUtils.writeInvocationResult(writer, res0bj, err);
-
- out.synchronize();
-
- plc = ctx.gateway().computeTaskJobResult(taskPtr, job.pointer(), mem.pointer());
- }
- }
-
- ComputeJobResultPolicy plc0 = ComputeJobResultPolicy.fromOrdinal((byte) plc);
-
- assert plc0 != null : plc;
-
- return plc0;
- }
- finally {
- lock.readLock().unlock();
- }
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Void reduce(List<ComputeJobResult> results) {
- assert results.isEmpty() : "Should not cache result in java for interop task";
-
- lock.readLock().lock();
-
- try {
- assert !done;
-
- ctx.gateway().computeTaskReduce(taskPtr);
- }
- finally {
- lock.readLock().unlock();
- }
-
- return null;
- }
-
- /**
- * Callback invoked when task future is completed and all resources could be safely cleaned up.
- *
- * @param e If failed.
- */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public void onDone(Exception e) {
- lock.writeLock().lock();
-
- try {
- assert !done;
-
- if (e == null)
- // Normal completion.
- ctx.gateway().computeTaskComplete(taskPtr, 0);
- else {
- PlatformNativeException e0 = X.cause(e, PlatformNativeException.class);
-
- try (PlatformMemory mem = ctx.memory().allocate()) {
- PlatformOutputStream out = mem.output();
-
- PortableRawWriterEx writer = ctx.writer(out);
-
- if (e0 == null) {
- writer.writeBoolean(false);
- writer.writeString(e.getClass().getName());
- writer.writeString(e.getMessage());
- }
- else {
- writer.writeBoolean(true);
- writer.writeObject(e0.cause());
- }
-
- out.synchronize();
-
- ctx.gateway().computeTaskComplete(taskPtr, mem.pointer());
- }
- }
- }
- finally {
- // Done flag is set irrespective of any exceptions.
- done = true;
-
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Callback invoked by job when it wants to lock the task.
- *
- * @return {@code} True if task is not completed yet, {@code false} otherwise.
- */
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
- public boolean onJobLock() {
- lock.readLock().lock();
-
- if (done) {
- lock.readLock().unlock();
-
- return false;
- }
- else
- return true;
- }
-
- /**
- * Callback invoked by job when task can be unlocked.
- */
- public void onJobUnlock() {
- assert !done;
-
- lock.readLock().unlock();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
deleted file mode 100644
index 5570586..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformBalancingMultiClosureTask.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.compute;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.compute.ComputeJob;
-import org.apache.ignite.compute.ComputeLoadBalancer;
-import org.apache.ignite.compute.ComputeTaskNoResultCache;
-import org.apache.ignite.internal.processors.platform.PlatformContext;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.resources.LoadBalancerResource;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Interop multi-closure task with node balancing.
- */
-@ComputeTaskNoResultCache
-public class PlatformBalancingMultiClosureTask extends PlatformAbstractTask {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Jobs. */
- private Collection<PlatformJob> jobs;
-
- /** Load balancer. */
- @SuppressWarnings("UnusedDeclaration")
- @LoadBalancerResource
- private ComputeLoadBalancer lb;
-
- /**
- * Constructor.
- *
- * @param ctx Platform context.
- * @param taskPtr Task pointer.
- */
- public PlatformBalancingMultiClosureTask(PlatformContext ctx, long taskPtr) {
- super(ctx, taskPtr);
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
- @Nullable Object arg) {
- assert !F.isEmpty(jobs) : "Jobs emptiness must be checked in native platform.";
-
- if (!F.isEmpty(subgrid)) {
- Map<ComputeJob, ClusterNode> map = new HashMap<>(jobs.size(), 1);
-
- for (PlatformJob job : jobs)
- map.put(job, lb.getBalancedNode(job, null));
-
- return map;
- }
- else
- return Collections.emptyMap();
- }
-
- /**
- * @param jobs Jobs.
- */
- public void jobs(Collection<PlatformJob> jobs) {
- this.jobs = jobs;
- }
-}
\ No newline at end of file