You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:37 UTC

[01/27] ignite git commit: Further refactorings necessary for platforms move to Ignite.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1124 336d192b2 -> 43542dfe0


http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
deleted file mode 100644
index 7e9587f..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
+++ /dev/null
@@ -1,468 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.callback;
-
-/**
- * Platform callback utility methods. Implemented in target platform. All methods in this class must be
- * package-visible and invoked only through {@link PlatformCallbackGateway}.
- */
-public class PlatformCallbackUtils {
-    /**
-     * Create cache store.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long cacheStoreCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Object pointer.
-     * @param memPtr Memory pointer.
-     * @param cb Callback.
-     * @return Result.
-     */
-    static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr, Object cb);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Object pointer.
-     */
-    static native void cacheStoreDestroy(long envPtr, long objPtr);
-
-    /**
-     * Creates cache store session.
-     *
-     * @param envPtr Environment pointer.
-     * @param storePtr Store instance pointer.
-     * @return Session instance pointer.
-     */
-    static native long cacheStoreSessionCreate(long envPtr, long storePtr);
-
-    /**
-     * Creates cache entry filter and returns a pointer.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long cacheEntryFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     */
-    static native void cacheEntryFilterDestroy(long envPtr, long objPtr);
-
-    /**
-     * Invoke cache entry processor.
-     *
-     * @param envPtr Environment pointer.
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     */
-    static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
-     * @param inMemPtr Input memory pointer.
-     */
-    static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Perform native task job result notification.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer (always zero for local job execution).
-     * @return Job result enum ordinal.
-     */
-    static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr);
-
-    /**
-     * Perform native task reduce.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     */
-    static native void computeTaskReduce(long envPtr, long taskPtr);
-
-    /**
-     * Complete task with native error.
-     *
-     * @param envPtr Environment pointer.
-     * @param taskPtr Task pointer.
-     * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
-     */
-    static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr);
-
-    /**
-     * Serialize native job.
-     *
-     * @param envPtr Environment pointer.
-     * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer.
-     * @return {@code True} if serialization succeeded.
-     */
-    static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr);
-
-    /**
-     * Create job in native platform.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer to job.
-     */
-    static native long computeJobCreate(long envPtr, long memPtr);
-
-    /**
-     * Execute native job on a node other than where it was created.
-     *
-     * @param envPtr Environment pointer.
-     * @param jobPtr Job pointer.
-     * @param cancel Cancel flag.
-     * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
-     */
-    static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr);
-
-    /**
-     * Cancel the job.
-     *
-     * @param envPtr Environment pointer.
-     * @param jobPtr Job pointer.
-     */
-    static native void computeJobCancel(long envPtr, long jobPtr);
-
-    /**
-     * Destroy the job.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Pointer.
-     */
-    static native void computeJobDestroy(long envPtr, long ptr);
-
-    /**
-     * Invoke local callback.
-     *
-     * @param envPtr Environment pointer.
-     * @param cbPtr Callback pointer.
-     * @param memPtr Memory pointer.
-     */
-    static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr);
-
-    /**
-     * Create filter in native platform.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer to created filter.
-     */
-    static native long continuousQueryFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * Invoke remote filter.
-     *
-     * @param envPtr Environment pointer.
-     * @param filterPtr Filter pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr);
-
-    /**
-     * Release remote  filter.
-     *
-     * @param envPtr Environment pointer.
-     * @param filterPtr Filter pointer.
-     */
-    static native void continuousQueryFilterRelease(long envPtr, long filterPtr);
-
-    /**
-     * Notify native data streamer about topology update.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Data streamer native pointer.
-     * @param topVer Topology version.
-     * @param topSize Topology size.
-     */
-    static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize);
-
-    /**
-     * Invoke stream receiver.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Receiver native pointer.
-     * @param cache Cache object.
-     * @param memPtr Stream pointer.
-     * @param keepPortable Portable flag.
-     */
-    static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr,
-        boolean keepPortable);
-
-    /**
-     * Notify future with byte result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureByteResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with boolean result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureBoolResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with short result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureShortResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with byte result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureCharResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with int result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureIntResult(long envPtr, long futPtr, int res);
-
-    /**
-     * Notify future with float result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureFloatResult(long envPtr, long futPtr, float res);
-
-    /**
-     * Notify future with long result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureLongResult(long envPtr, long futPtr, long res);
-
-    /**
-     * Notify future with double result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    static native void futureDoubleResult(long envPtr, long futPtr, double res);
-
-    /**
-     * Notify future with object result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param memPtr Memory pointer.
-     */
-    static native void futureObjectResult(long envPtr, long futPtr, long memPtr);
-
-    /**
-     * Notify future with null result.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     */
-    static native void futureNullResult(long envPtr, long futPtr);
-
-    /**
-     * Notify future with error.
-     *
-     * @param envPtr Environment pointer.
-     * @param futPtr Future pointer.
-     * @param memPtr Pointer to memory with error information.
-     */
-    static native void futureError(long envPtr, long futPtr, long memPtr);
-
-    /**
-     * Creates message filter and returns a pointer.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long messagingFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int messagingFilterApply(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     */
-    static native void messagingFilterDestroy(long envPtr, long objPtr);
-
-    /**
-     * Creates event filter and returns a pointer.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    static native long eventFilterCreate(long envPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    static native int eventFilterApply(long envPtr, long objPtr, long memPtr);
-
-    /**
-     * @param envPtr Environment pointer.
-     * @param objPtr Pointer.
-     */
-    static native void eventFilterDestroy(long envPtr, long objPtr);
-
-    /**
-     * Sends node info to native target.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Ptr to a stream with serialized node.
-     */
-    static native void nodeInfo(long envPtr, long memPtr);
-
-    /**
-     * Kernal start callback.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Memory pointer.
-     */
-    static native void onStart(long envPtr, long memPtr);
-
-    /*
-     * Kernal stop callback.
-     *
-     * @param envPtr Environment pointer.
-     */
-    static native void onStop(long envPtr);
-
-    /**
-     * Lifecycle event callback.
-     *
-     * @param envPtr Environment pointer.
-     * @param ptr Holder pointer.
-     * @param evt Event.
-     */
-    static native void lifecycleEvent(long envPtr, long ptr, int evt);
-
-    /**
-     * Re-allocate external memory chunk.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Cross-platform pointer.
-     * @param cap Capacity.
-     */
-    static native void memoryReallocate(long envPtr, long memPtr, int cap);
-
-    /**
-     * Initializes native service.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Stream pointer.
-     * @return Pointer to the native platform service.
-     */
-    static native long serviceInit(long envPtr, long memPtr);
-
-    /**
-     * Executes native service.
-     *
-     * @param envPtr Environment pointer.
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param memPtr Stream pointer.
-     */
-    static native void serviceExecute(long envPtr, long svcPtr, long memPtr);
-
-    /**
-     * Cancels native service.
-     *
-     * @param envPtr Environment pointer.
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param memPtr Stream pointer.
-     */
-    static native void serviceCancel(long envPtr, long svcPtr, long memPtr);
-
-    /**
-     /**
-     * Invokes service method.
-     *
-     * @param envPtr Environment pointer.
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     */
-    static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr);
-
-    /**
-     * Invokes cluster node filter.
-     *
-     * @param envPtr Environment pointer.
-     * @param memPtr Stream pointer.
-     */
-    static native int clusterNodeFilterApply(long envPtr, long memPtr);
-
-    /**
-     * Private constructor.
-     */
-    private PlatformCallbackUtils() {
-        // No-op.
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
deleted file mode 100644
index 9273e29..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.memory;
-
-import org.apache.ignite.internal.portable.streams.*;
-
-/**
- * Interop output stream,
- */
-public interface PlatformInputStream extends PortableInputStream {
-    /**
-     * Synchronize input. Must be called before start reading data from a memory changed by another platform.
-     */
-    public void synchronize();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
deleted file mode 100644
index 9d8f94e..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.memory;
-
-/**
- * Interop memory chunk.
- */
-public interface PlatformMemory extends AutoCloseable {
-    /**
-     * Gets input stream.
-     *
-     * @return Input stream.
-     */
-    public PlatformInputStream input();
-
-    /**
-     * Gets output stream.
-     *
-     * @return Output stream.
-     */
-    public PlatformOutputStream output();
-
-    /**
-     * Gets pointer which can be passed between platforms.
-     *
-     * @return Pointer.
-     */
-    public long pointer();
-
-    /**
-     * Gets data pointer.
-     *
-     * @return Data pointer.
-     */
-    public long data();
-
-    /**
-     * Gets capacity.
-     *
-     * @return Capacity.
-     */
-    public int capacity();
-
-    /**
-     * Gets length.
-     *
-     * @return Length.
-     */
-    public int length();
-
-    /**
-     * Reallocate memory chunk.
-     *
-     * @param cap Minimum capacity.
-     */
-    public void reallocate(int cap);
-
-    /**
-     * Close memory releasing it.
-     */
-    @Override void close();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
deleted file mode 100644
index c2233a8..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.memory;
-
-/**
- * Interop memory manager interface.
- */
-public interface PlatformMemoryManager {
-    /**
-     * Allocates memory.
-     *
-     * @return Memory.
-     */
-    public PlatformMemory allocate();
-
-    /**
-     * Allocates memory having at least the given capacity.
-     *
-     * @param cap Minimum capacity.
-     * @return Memory.
-     */
-    public PlatformMemory allocate(int cap);
-
-    /**
-     * Gets memory from existing pointer.
-     *
-     * @param memPtr Cross-platform memory pointer.
-     * @return Memory.
-     */
-    public PlatformMemory get(long memPtr);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
deleted file mode 100644
index eb2490a..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.memory;
-
-import org.apache.ignite.internal.portable.streams.*;
-
-/**
- * Interop output stream.
- */
-public interface PlatformOutputStream extends PortableOutputStream {
-    /**
-     * Synchronize output stream with underlying memory
-     */
-    public void synchronize();
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 66e87e2..f82fb0f 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.utils;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.portable.*;
 import org.apache.ignite.internal.processors.platform.*;
 import org.apache.ignite.internal.processors.platform.memory.*;
@@ -577,6 +578,28 @@ public class PlatformUtils {
     }
 
     /**
+     * Get GridGain platform processor.
+     *
+     * @param grid Ignite instance.
+     * @return Platform processor.
+     */
+    public static PlatformProcessor platformProcessor(Ignite grid) {
+        GridKernalContext ctx = ((IgniteKernal) grid).context();
+
+        return ctx.platform();
+    }
+
+    /**
+     * Gets interop context for the grid.
+     *
+     * @param grid Grid
+     * @return Context.
+     */
+    public static PlatformContext platformContext(Ignite grid) {
+        return platformProcessor(grid).context();
+    }
+
+    /**
      * Private constructor.
      */
     private PlatformUtils() {


[18/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7c2c02bf
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7c2c02bf
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7c2c02bf

Branch: refs/heads/ignite-1124
Commit: 7c2c02bf0fb57e90c22bb28e71796b378f1d755b
Parents: a312934 00b27ce
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 13:17:43 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 13:17:43 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../util/future/GridEmbeddedFuture.java         |  55 +++-
 .../distributed/CacheAsyncOperationsTest.java   | 280 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 5 files changed, 342 insertions(+), 7 deletions(-)
----------------------------------------------------------------------



[22/27] ignite git commit: ignite-1307 Do not complete prepare future with null result

Posted by sb...@apache.org.
ignite-1307 Do not complete prepare future with null result


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b132006f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b132006f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b132006f

Branch: refs/heads/ignite-1124
Commit: b132006f85df6c1b15e676dd0d8082cf44984b84
Parents: 3d46b62
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 13:39:26 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 13:39:26 2015 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtTxPrepareFuture.java          | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b132006f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 2071f8e..4c39476 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -729,7 +729,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
      * Completes this future.
      */
     public void complete() {
-        onComplete(null);
+        GridNearTxPrepareResponse res = new GridNearTxPrepareResponse();
+
+        res.error(new IgniteCheckedException("Failed to prepare transaction."));
+
+        onComplete(res);
     }
 
     /**


[05/27] ignite git commit: Use special method for test debug info dumping to avoid waiting for hanging nodes start.

Posted by sb...@apache.org.
Use special method  for test debug info dumping to avoid waiting for hanging nodes start.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fc60fee5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fc60fee5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fc60fee5

Branch: refs/heads/ignite-1124
Commit: fc60fee5f727210f5a3a16997e2bd6b7b07a4539
Parents: 5877b30
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 12:07:47 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 12:07:47 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  | 23 ++++++++++++++++++--
 .../testframework/junits/GridAbstractTest.java  |  2 +-
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/fc60fee5/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index fd74745..cd91fa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1015,10 +1015,29 @@ public class IgnitionEx {
      * @return List of all grids started so far.
      */
     public static List<Ignite> allGrids() {
+        return allGrids(true);
+    }
+
+    /**
+     * Gets a list of all grids started so far.
+     *
+     * @return List of all grids started so far.
+     */
+    public static List<Ignite> allGridsx() {
+        return allGrids(false);
+    }
+
+    /**
+     * Gets a list of all grids started so far.
+     *
+     * @param wait If {@code true} wait for node start finish.
+     * @return List of all grids started so far.
+     */
+    private static List<Ignite> allGrids(boolean wait) {
         List<Ignite> allIgnites = new ArrayList<>(grids.size() + 1);
 
         for (IgniteNamedInstance grid : grids.values()) {
-            Ignite g = grid.grid();
+            Ignite g = wait ? grid.grid() : grid.gridx();
 
             if (g != null)
                 allIgnites.add(g);
@@ -1027,7 +1046,7 @@ public class IgnitionEx {
         IgniteNamedInstance dfltGrid0 = dfltGrid;
 
         if (dfltGrid0 != null) {
-            IgniteKernal g = dfltGrid0.grid();
+            IgniteKernal g = wait ? dfltGrid0.grid() : dfltGrid0.gridx();
 
             if (g != null)
                 allIgnites.add(g);

http://git-wip-us.apache.org/repos/asf/ignite/blob/fc60fee5/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
index d6591cd..9cd621a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java
@@ -1578,7 +1578,7 @@ public abstract class GridAbstractTest extends TestCase {
                 "Test has been timed out and will be interrupted (threads dump will be taken before interruption) [" +
                 "test=" + getName() + ", timeout=" + getTestTimeout() + ']');
 
-            List<Ignite> nodes = G.allGrids();
+            List<Ignite> nodes = IgnitionEx.allGridsx();
 
             for (Ignite node : nodes)
                 ((IgniteKernal)node).dumpDebugInfo();


[14/27] ignite git commit: Fixed GridEmbeddedFuture used for async cache operations, added test.

Posted by sb...@apache.org.
Fixed GridEmbeddedFuture used for async cache operations, added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e567f8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e567f8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e567f8cb

Branch: refs/heads/ignite-1124
Commit: e567f8cb3dd88c3c0a253c4fde06f8ced9b97fde
Parents: c8bc1f9
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 12:39:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 12:41:05 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   6 +-
 .../util/future/GridEmbeddedFuture.java         |  55 +++-
 .../distributed/CacheAsyncOperationsTest.java   | 280 +++++++++++++++++++
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 5 files changed, 342 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 7adea2b..54d33e0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4132,9 +4132,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             final IgniteTxLocalAdapter tx0 = tx;
 
             if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut,
-                    new C2<T, Exception, IgniteInternalFuture<T>>() {
-                        @Override public IgniteInternalFuture<T> apply(T t, Exception e) {
+                IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
+                    new IgniteOutClosure<IgniteInternalFuture>() {
+                        @Override public IgniteInternalFuture<T> apply() {
                             return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
                                 @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
                                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index c44b028..9087d20 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -567,9 +567,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
             IgniteInternalFuture fut = holder.future();
 
             if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture<T> f = new GridEmbeddedFuture<>(fut,
-                    new C2<T, Exception, IgniteInternalFuture<T>>() {
-                        @Override public IgniteInternalFuture<T> apply(T t, Exception e) {
+                IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
+                    new IgniteOutClosure<IgniteInternalFuture>() {
+                        @Override public IgniteInternalFuture<T> apply() {
                             return op.apply();
                         }
                     });

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
index 4475fae..11b28b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridEmbeddedFuture.java
@@ -68,7 +68,8 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
 
     /**
      * Embeds futures. Specific change order of arguments to avoid conflicts.
-     *  @param embedded Closure.
+     *
+     * @param embedded Embedded future.
      * @param c Closure which runs upon completion of embedded closure and which returns another future.
      */
     public GridEmbeddedFuture(
@@ -200,6 +201,58 @@ public class GridEmbeddedFuture<A, B> extends GridFutureAdapter<A> {
         });
     }
 
+    /**
+     * @param embedded Embedded future.
+     * @param c Closure to create next future.
+     */
+    public GridEmbeddedFuture(
+        IgniteInternalFuture<B> embedded,
+        final IgniteOutClosure<IgniteInternalFuture<A>> c
+    ) {
+        assert embedded != null;
+        assert c != null;
+
+        this.embedded = embedded;
+
+        embedded.listen(new AL1() {
+            @Override public void applyx(IgniteInternalFuture<B> embedded) {
+                try {
+                    IgniteInternalFuture<A> next = c.apply();
+
+                    if (next == null) {
+                        onDone();
+
+                        return;
+                    }
+
+                    next.listen(new AL2() {
+                        @Override public void applyx(IgniteInternalFuture<A> next) {
+                            try {
+                                onDone(next.get());
+                            }
+                            catch (GridClosureException e) {
+                                onDone(e.unwrap());
+                            }
+                            catch (IgniteCheckedException | RuntimeException e) {
+                                onDone(e);
+                            }
+                            catch (Error e) {
+                                onDone(e);
+
+                                throw e;
+                            }
+                        }
+                    });
+                }
+                catch (Error e) {
+                    onDone(e);
+
+                    throw e;
+                }
+            }
+        });
+    }
+
     /** {@inheritDoc} */
     @Override public boolean cancel() throws IgniteCheckedException {
         return embedded.cancel();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
new file mode 100644
index 0000000..2094d0a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.store.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import javax.cache.*;
+import javax.cache.configuration.*;
+import javax.cache.integration.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsTest extends GridCommonAbstractTest {
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static volatile CountDownLatch latch;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+        if (gridName.equals(getTestGridName(1)))
+            cfg.setClientMode(true);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncOperationsTx() throws Exception {
+        asyncOperations(TRANSACTIONAL);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncOperationsAtomic() throws Exception {
+        asyncOperations(ATOMIC);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        CountDownLatch latch0 = latch;
+
+        if (latch0 != null)
+            latch0.countDown();
+
+        latch = null;
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @throws Exception If failed.
+     */
+    public void asyncOperations(CacheAtomicityMode atomicityMode) throws Exception {
+        try (IgniteCache<Integer, Integer> cache = ignite(1).getOrCreateCache(cacheConfiguration(atomicityMode))) {
+            async1(cache);
+
+            async2(cache);
+        }
+    }
+
+    /**
+     * @param cache Cache.
+     */
+    private void async1(IgniteCache<Integer, Integer> cache) {
+        cache.put(1, 1);
+
+        latch = new CountDownLatch(1);
+
+        IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut1 = asyncCache.future();
+
+        asyncCache.getAndPut(1, 2);
+
+        IgniteFuture<?> fut2 = asyncCache.future();
+
+        asyncCache.getAndPut(1, 3);
+
+        IgniteFuture<?> fut3 = asyncCache.future();
+
+        assertFalse(fut1.isDone());
+        assertFalse(fut2.isDone());
+        assertFalse(fut3.isDone());
+
+        latch.countDown();
+
+        try {
+            fut1.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        assertEquals(1, fut2.get());
+        assertEquals(2, fut3.get());
+
+        assertNull(cache.get(0));
+        assertEquals((Integer)3, cache.get(1));
+    }
+    /**
+     *
+     * @param cache Cache.
+     */
+    private void async2(IgniteCache<Integer, Integer> cache) {
+        cache.put(1, 1);
+
+        latch = new CountDownLatch(1);
+
+        IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut1 = asyncCache.future();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut2 = asyncCache.future();
+
+        asyncCache.getAndPut(1, 2);
+
+        IgniteFuture<?> fut3 = asyncCache.future();
+
+        asyncCache.put(0, 0);
+
+        IgniteFuture<?> fut4 = asyncCache.future();
+
+        assertFalse(fut1.isDone());
+        assertFalse(fut2.isDone());
+        assertFalse(fut3.isDone());
+        assertFalse(fut4.isDone());
+
+        latch.countDown();
+
+        try {
+            fut1.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        try {
+            fut2.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        assertEquals(1, fut3.get());
+
+        try {
+            fut4.get();
+
+            fail();
+        }
+        catch (CacheException e) {
+            log.info("Expected error: " + e);
+        }
+
+        assertNull(cache.get(0));
+        assertEquals((Integer)2, cache.get(1));
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode atomicityMode) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setWriteThrough(true);
+        ccfg.setCacheStoreFactory(new StoreFactory());
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    private static class StoreFactory implements Factory<TestStore> {
+        /** {@inheritDoc} */
+        @Override public TestStore create() {
+            return new TestStore();
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestStore extends CacheStoreAdapter<Integer, Integer> {
+        /** {@inheritDoc} */
+        @Override public Integer load(Integer key) throws CacheLoaderException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) {
+            CountDownLatch latch0 = latch;
+
+            if (latch0 != null)
+                U.awaitQuiet(latch0);
+
+            Integer key = entry.getKey();
+
+            if (key.equals(0)) {
+                System.out.println(Thread.currentThread().getName() + ": fail operation for key: " + key);
+
+                throw new CacheWriterException("Test error.");
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void delete(Object key) {
+            // No-op.
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e567f8cb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
index 5947d33..c20e901 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite3.java
@@ -133,6 +133,8 @@ public class IgniteCacheTestSuite3 extends TestSuite {
 
         suite.addTestSuite(IgniteTxGetAfterStopTest.class);
 
+        suite.addTestSuite(CacheAsyncOperationsTest.class);
+
         return suite;
     }
 }


[12/27] ignite git commit: Moved platform cluster node filter to Ignite.

Posted by sb...@apache.org.
Moved platform cluster node filter to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/93b29426
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/93b29426
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/93b29426

Branch: refs/heads/ignite-1124
Commit: 93b294260f8081424b86d28ffd4c0232df1d04f0
Parents: daa8796
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 12:25:52 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:25:52 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformBootstrap.java  |  1 -
 .../cluster/PlatformClusterNodeFilter.java      | 77 ++++++++++++++++++++
 2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/93b29426/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java
index 319c670..ce26475 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformBootstrap.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.platform;
 
 import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.processors.platform.*;
 
 /**
  * Platform bootstrap. Responsible for starting Ignite node with non-Java platform.

http://git-wip-us.apache.org/repos/asf/ignite/blob/93b29426/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java
new file mode 100644
index 0000000..e12449f
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterNodeFilter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+
+/**
+ * Interop cluster node filter.
+ */
+public class PlatformClusterNodeFilter extends PlatformAbstractPredicate implements IgnitePredicate<ClusterNode> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformClusterNodeFilter() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ctx Kernal context.
+     */
+    public PlatformClusterNodeFilter(Object pred, PlatformContext ctx) {
+        super(pred, 0, ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean apply(ClusterNode clusterNode) {
+        try (PlatformMemory mem = ctx.memory().allocate()) {
+            PlatformOutputStream out = mem.output();
+
+            PortableRawWriterEx writer = ctx.writer(out);
+
+            writer.writeObject(pred);
+            ctx.writeNode(writer, clusterNode);
+
+            out.synchronize();
+
+            return ctx.gateway().clusterNodeFilterApply(mem.pointer()) != 0;
+        }
+    }
+
+    /**
+     * @param ignite Ignite instance.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    @IgniteInstanceResource
+    public void setIgniteInstance(Ignite ignite) {
+        ctx = PlatformUtils.platformContext(ignite);
+    }
+}


[17/27] ignite git commit: ignite-1300: add ability to register class types outside of the portable context

Posted by sb...@apache.org.
ignite-1300: add ability to register class types outside of the portable context


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f575ff11
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f575ff11
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f575ff11

Branch: refs/heads/ignite-1124
Commit: f575ff119d589c04836a3de065e5f958daec8000
Parents: 00b27ce
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Aug 27 13:17:35 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Aug 27 13:17:35 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/PortableContext.java      | 115 +++++++++++--------
 1 file changed, 67 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f575ff11/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index a9d64d9..723113e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -114,10 +114,19 @@ public class PortableContext implements Externalizable {
     private String gridName;
 
     /** */
-    private PortableMarshaller marsh;
+    private final OptimizedMarshaller optmMarsh = new OptimizedMarshaller();
 
     /** */
-    private final OptimizedMarshaller optmMarsh = new OptimizedMarshaller();
+    private boolean convertStrings;
+
+    /** */
+    private boolean useTs;
+
+    /** */
+    private boolean metaDataEnabled;
+
+    /** */
+    private boolean keepDeserialized;
 
     /**
      * For {@link Externalizable}.
@@ -199,18 +208,6 @@ public class PortableContext implements Externalizable {
         registerPredefinedType(PortableObjectImpl.class, 63);
 
         registerPredefinedType(PortableMetaDataImpl.class, 64);
-
-// TODO: IGNITE-1258
-//        registerPredefinedType(DrSenderAttributes.class, 65);
-//        registerPredefinedType(DrSenderRemoteAttributes.class, 66);
-//
-//        registerPredefinedType(InteropMetadata.class, 70);
-//
-//        registerPredefinedType(InteropDotNetConfiguration.class, 71);
-//        registerPredefinedType(InteropDotNetPortableConfiguration.class, 72);
-//        registerPredefinedType(InteropDotNetPortableTypeConfiguration.class, 73);
-//        registerPredefinedType(InteropIgniteProxy.class, 74);
-//        registerPredefinedType(InteropProductLicence.class, 78);
     }
 
     /**
@@ -221,40 +218,68 @@ public class PortableContext implements Externalizable {
         if (marsh == null)
             return;
 
-        this.marsh = marsh;
+        convertStrings = marsh.isConvertStringToBytes();
+        useTs = marsh.isUseTimestamp();
+        metaDataEnabled = marsh.isMetaDataEnabled();
+        keepDeserialized = marsh.isKeepDeserialized();
+
         marshCtx = marsh.getContext();
 
         assert marshCtx != null;
 
         optmMarsh.setContext(marshCtx);
 
-        PortableIdMapper globalIdMapper = marsh.getIdMapper();
-        PortableSerializer globalSerializer = marsh.getSerializer();
-        boolean globalUseTs = marsh.isUseTimestamp();
-        boolean globalMetaDataEnabled = marsh.isMetaDataEnabled();
-        boolean globalKeepDeserialized = marsh.isKeepDeserialized();
+        configure(
+            marsh.getIdMapper(),
+            marsh.getSerializer(),
+            marsh.isUseTimestamp(),
+            marsh.isMetaDataEnabled(),
+            marsh.isKeepDeserialized(),
+            marsh.getClassNames(),
+            marsh.getTypeConfigurations()
+        );
+    }
 
+    /**
+     * @param globalIdMapper ID mapper.
+     * @param globalSerializer Serializer.
+     * @param globalUseTs Use timestamp flag.
+     * @param globalMetaDataEnabled Metadata enabled flag.
+     * @param globalKeepDeserialized Keep deserialized flag.
+     * @param clsNames Class names.
+     * @param typeCfgs Type configurations.
+     * @throws PortableException In case of error.
+     */
+    private void configure(
+        PortableIdMapper globalIdMapper,
+        PortableSerializer globalSerializer,
+        boolean globalUseTs,
+        boolean globalMetaDataEnabled,
+        boolean globalKeepDeserialized,
+        Collection<String> clsNames,
+        Collection<PortableTypeConfiguration> typeCfgs
+    ) throws PortableException {
         TypeDescriptors descs = new TypeDescriptors();
 
-        if (marsh.getClassNames() != null) {
+        if (clsNames != null) {
             PortableIdMapper idMapper = new IdMapperWrapper(globalIdMapper);
 
-            for (String clsName : marsh.getClassNames()) {
+            for (String clsName : clsNames) {
                 if (clsName.endsWith(".*")) { // Package wildcard
                     String pkgName = clsName.substring(0, clsName.length() - 2);
 
                     for (String clsName0 : classesInPackage(pkgName))
                         descs.add(clsName0, idMapper, null, null, globalUseTs, globalMetaDataEnabled,
-                                  globalKeepDeserialized, true);
+                            globalKeepDeserialized, true);
                 }
                 else // Regular single class
                     descs.add(clsName, idMapper, null, null, globalUseTs, globalMetaDataEnabled,
-                              globalKeepDeserialized, true);
+                        globalKeepDeserialized, true);
             }
         }
 
-        if (marsh.getTypeConfigurations() != null) {
-            for (PortableTypeConfiguration typeCfg : marsh.getTypeConfigurations()) {
+        if (typeCfgs != null) {
+            for (PortableTypeConfiguration typeCfg : typeCfgs) {
                 String clsName = typeCfg.getClassName();
 
                 if (clsName == null)
@@ -283,17 +308,18 @@ public class PortableContext implements Externalizable {
 
                     for (String clsName0 : classesInPackage(pkgName))
                         descs.add(clsName0, idMapper, serializer, typeCfg.getAffinityKeyFieldName(), useTs,
-                                  metaDataEnabled, keepDeserialized, true);
+                            metaDataEnabled, keepDeserialized, true);
                 }
                 else
                     descs.add(clsName, idMapper, serializer, typeCfg.getAffinityKeyFieldName(), useTs,
-                              metaDataEnabled, keepDeserialized, false);
+                        metaDataEnabled, keepDeserialized, false);
             }
         }
 
-        for (TypeDescriptor desc : descs.descriptors())
+        for (TypeDescriptor desc : descs.descriptors()) {
             registerUserType(desc.clsName, desc.idMapper, desc.serializer, desc.affKeyFieldName, desc.useTs,
-                             desc.metadataEnabled, desc.keepDeserialized);
+                desc.metadataEnabled, desc.keepDeserialized);
+        }
     }
 
     /**
@@ -437,9 +463,9 @@ public class PortableContext implements Externalizable {
                 clsName,
                 BASIC_CLS_ID_MAPPER,
                 null,
-                marsh.isUseTimestamp(),
-                marsh.isMetaDataEnabled(),
-                marsh.isKeepDeserialized());
+                useTs,
+                metaDataEnabled,
+                keepDeserialized);
 
             PortableClassDescriptor old = descByCls.putIfAbsent(cls, desc);
 
@@ -483,9 +509,9 @@ public class PortableContext implements Externalizable {
             typeName,
             idMapper,
             null,
-            marsh.isUseTimestamp(),
-            marsh.isMetaDataEnabled(),
-            marsh.isKeepDeserialized(),
+            useTs,
+            metaDataEnabled,
+            keepDeserialized,
             registered);
 
         // perform put() instead of putIfAbsent() because "registered" flag may have been changed.
@@ -632,9 +658,9 @@ public class PortableContext implements Externalizable {
     /**
      * @param cls Class.
      * @param id Type ID.
-     * @return PortableClassDescriptor.
+     * @return GridPortableClassDescriptor.
      */
-    private PortableClassDescriptor registerPredefinedType(Class<?> cls, int id) {
+    public PortableClassDescriptor registerPredefinedType(Class<?> cls, int id) {
         PortableClassDescriptor desc = new PortableClassDescriptor(
             this,
             cls,
@@ -734,13 +760,6 @@ public class PortableContext implements Externalizable {
     }
 
     /**
-     * @return Whether meta data is globally enabled.
-     */
-    boolean isMetaDataEnabled() {
-        return marsh.isMetaDataEnabled();
-    }
-
-    /**
      * @param typeId Type ID.
      * @return Whether meta data is enabled.
      */
@@ -794,7 +813,7 @@ public class PortableContext implements Externalizable {
      * @return Use timestamp flag.
      */
     public boolean isUseTimestamp() {
-        return marsh.isUseTimestamp();
+        return useTs;
     }
 
     /**
@@ -809,7 +828,7 @@ public class PortableContext implements Externalizable {
      * @return Whether to convert string to UTF8 bytes.
      */
     public boolean isConvertString() {
-        return marsh.isConvertStringToBytes();
+        return convertStrings;
     }
 
     /**
@@ -1082,4 +1101,4 @@ public class PortableContext implements Externalizable {
             return registered;
         }
     }
-}
+}
\ No newline at end of file


[09/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b2ee50b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b2ee50b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b2ee50b

Branch: refs/heads/ignite-1124
Commit: 6b2ee50b058e37f71ec1a0aab76454aaef87dec2
Parents: 9fe3e8f c8bc1f9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 12:11:45 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:11:45 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  | 23 ++++++++++++++++++--
 .../testframework/junits/GridAbstractTest.java  |  2 +-
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[16/27] ignite git commit: Platforms: refactored metadata management methods.

Posted by sb...@apache.org.
Platforms: refactored metadata management methods.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a312934c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a312934c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a312934c

Branch: refs/heads/ignite-1124
Commit: a312934cb82a71145ed6018230a21302f871a1fa
Parents: 58f9fe4
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 13:17:20 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 13:17:20 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java         | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a312934c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index 90ed85d..504f79e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -114,6 +114,21 @@ public interface PlatformContext {
     public void processMetadata(PortableRawReaderEx reader);
 
     /**
+     * Write metadata for the given type ID.
+     *
+     * @param writer Writer.
+     * @param typeId Type ID.
+     */
+    public void writeMetadata(PortableRawWriterEx writer, int typeId);
+
+    /**
+     * Write all available metadata.
+     *
+     * @param writer Writer.
+     */
+    public void writeAllMetadata(PortableRawWriterEx writer);
+
+    /**
      * Write cluster metrics.
      *
      * @param writer Writer.


[26/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e30c863
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e30c863
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e30c863

Branch: refs/heads/ignite-1124
Commit: 3e30c863bced6c77a8cffa022f966a50ceac19e1
Parents: 73ab5e2 c279fca
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 15:17:52 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 15:17:52 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/portable/PortableContext.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[24/27] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main

Posted by sb...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c279fca9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c279fca9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c279fca9

Branch: refs/heads/ignite-1124
Commit: c279fca96e16b54482725bbca4d9a30b33144c33
Parents: 28c8dc7 b132006
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Aug 27 13:58:34 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Aug 27 13:58:34 2015 +0300

----------------------------------------------------------------------
 .../cache/distributed/dht/GridDhtTxPrepareFuture.java          | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[23/27] ignite git commit: correct predefined classes IDs mapping (portable context)

Posted by sb...@apache.org.
correct predefined classes IDs mapping (portable context)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/28c8dc7e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/28c8dc7e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/28c8dc7e

Branch: refs/heads/ignite-1124
Commit: 28c8dc7e9d92fb95aa1a9bdef82944697aac95e6
Parents: 3d46b62
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Aug 27 13:57:58 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Aug 27 13:57:58 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/portable/PortableContext.java  | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/28c8dc7e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 723113e..cd3abc1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -159,6 +159,8 @@ public class PortableContext implements Externalizable {
         mapTypes.put(ConcurrentHashMap8.class, GridPortableMarshaller.CONC_HASH_MAP);
         mapTypes.put(Properties.class, GridPortableMarshaller.PROPERTIES_MAP);
 
+        // IDs range from [0..200] is used by Java SDK API and GridGain legacy API
+
         registerPredefinedType(Byte.class, GridPortableMarshaller.BYTE);
         registerPredefinedType(Boolean.class, GridPortableMarshaller.BOOLEAN);
         registerPredefinedType(Short.class, GridPortableMarshaller.SHORT);
@@ -205,9 +207,10 @@ public class PortableContext implements Externalizable {
         registerPredefinedType(IgniteBiTuple.class, 61);
         registerPredefinedType(T2.class, 62);
 
-        registerPredefinedType(PortableObjectImpl.class, 63);
+        // IDs range [200..1000] is used by Ignite internal APIs.
 
-        registerPredefinedType(PortableMetaDataImpl.class, 64);
+        registerPredefinedType(PortableObjectImpl.class, 200);
+        registerPredefinedType(PortableMetaDataImpl.class, 201);
     }
 
     /**


[27/27] ignite git commit: Merge remote-tracking branch 'remotes/origin/master' into ignite-1124

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/master' into ignite-1124


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43542dfe
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43542dfe
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43542dfe

Branch: refs/heads/ignite-1124
Commit: 43542dfe035a192267376feae02defbebaef91af
Parents: 336d192 3e30c86
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 15:19:06 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 15:19:06 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteInternalFuture.java   |  10 +
 .../org/apache/ignite/internal/IgnitionEx.java  |  23 +-
 .../internal/portable/PortableContext.java      | 127 +--
 .../processors/cache/GridCacheAdapter.java      |  10 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   6 +-
 .../dht/atomic/GridDhtAtomicCache.java          |  10 +-
 .../query/GridCacheQueryFutureAdapter.java      |   7 +
 .../processors/igfs/IgfsMetaManager.java        |  43 +-
 .../internal/processors/igfs/IgfsUtils.java     |  52 ++
 .../processors/platform/PlatformContext.java    | 138 +++
 .../platform/PlatformNoopProcessor.java         |   5 +
 .../processors/platform/PlatformProcessor.java  |   7 +
 .../callback/PlatformCallbackGateway.java       | 869 +++++++++++++++++++
 .../callback/PlatformCallbackUtils.java         | 468 ++++++++++
 .../platform/memory/PlatformInputStream.java    |  30 +
 .../platform/memory/PlatformMemory.java         |  77 ++
 .../platform/memory/PlatformMemoryManager.java  |  46 +
 .../platform/memory/PlatformOutputStream.java   |  30 +
 .../util/future/GridEmbeddedFuture.java         |  55 +-
 .../util/future/GridFinishedFuture.java         |   5 +
 .../internal/util/future/GridFutureAdapter.java |  58 +-
 ...acheAsyncOperationsFailoverAbstractTest.java | 329 +++++++
 .../CacheAsyncOperationsFailoverAtomicTest.java |  32 +
 .../CacheAsyncOperationsFailoverTxTest.java     |  32 +
 .../distributed/CacheAsyncOperationsTest.java   | 280 ++++++
 .../testframework/junits/GridAbstractTest.java  |   2 +-
 .../IgniteCacheFailoverTestSuite2.java          |   4 +
 .../testsuites/IgniteCacheTestSuite3.java       |   2 +
 .../platform/PlatformAbstractPredicate.java     |  64 ++
 .../processors/platform/PlatformBootstrap.java  |   1 -
 .../processors/platform/PlatformContext.java    | 114 ---
 .../callback/PlatformCallbackGateway.java       | 869 -------------------
 .../callback/PlatformCallbackUtils.java         | 468 ----------
 .../platform/cluster/PlatformClusterGroup.java  | 330 +++++++
 .../cluster/PlatformClusterNodeFilter.java      |  77 ++
 .../platform/memory/PlatformInputStream.java    |  30 -
 .../platform/memory/PlatformMemory.java         |  77 --
 .../platform/memory/PlatformMemoryManager.java  |  46 -
 .../platform/memory/PlatformOutputStream.java   |  30 -
 .../platform/utils/PlatformUtils.java           |  78 ++
 scripts/apply-pull-request.sh                   |   2 +
 41 files changed, 3199 insertions(+), 1744 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43542dfe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------


[03/27] ignite git commit: Moved two platform utility methods to Ignite.

Posted by sb...@apache.org.
Moved two platform utility methods to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/57184803
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/57184803
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/57184803

Branch: refs/heads/ignite-1124
Commit: 57184803fc64049e18d27d04777687e3f1f7e42d
Parents: 5877b30
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 10:07:55 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 10:07:55 2015 +0300

----------------------------------------------------------------------
 .../platform/utils/PlatformUtils.java           | 55 ++++++++++++++++++++
 1 file changed, 55 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/57184803/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index f82fb0f..0777f9a 100644
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -600,6 +600,61 @@ public class PlatformUtils {
     }
 
     /**
+     * Reallocate arbitrary memory chunk.
+     *
+     * @param memPtr Memory pointer.
+     * @param cap Capacity.
+     */
+    public static void reallocate(long memPtr, int cap) {
+        PlatformMemoryUtils.reallocate(memPtr, cap);
+    }
+
+    /**
+     * Get error data.
+     *
+     * @param err Error.
+     * @return Error data.
+     */
+    @SuppressWarnings("UnusedDeclaration")
+    public static byte[] errorData(Throwable err) {
+        if (err instanceof PlatformExtendedException) {
+            PlatformContext ctx = ((PlatformExtendedException)err).context();
+
+            try (PlatformMemory mem = ctx.memory().allocate()) {
+                // Write error data.
+                PlatformOutputStream out = mem.output();
+
+                PortableRawWriterEx writer = ctx.writer(out);
+
+                try {
+                    PlatformUtils.writeErrorData(err, writer, ctx.kernalContext().log(PlatformContext.class));
+                }
+                finally {
+                    out.synchronize();
+                }
+
+                // Read error data into separate array.
+                PlatformInputStream in = mem.input();
+
+                in.synchronize();
+
+                int len = in.remaining();
+
+                assert len > 0;
+
+                byte[] arr = in.array();
+                byte[] res = new byte[len];
+
+                System.arraycopy(arr, 0, res, 0, len);
+
+                return res;
+            }
+        }
+        else
+            return null;
+    }
+
+    /**
      * Private constructor.
      */
     private PlatformUtils() {


[21/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3d46b624
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3d46b624
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3d46b624

Branch: refs/heads/ignite-1124
Commit: 3d46b6248712c67fa1ff695b0a21e33bd9425691
Parents: 712b29c f497e8e
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 13:29:31 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 13:29:31 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/PortableContext.java      | 115 +++++++++++--------
 1 file changed, 67 insertions(+), 48 deletions(-)
----------------------------------------------------------------------



[15/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/00b27cec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/00b27cec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/00b27cec

Branch: refs/heads/ignite-1124
Commit: 00b27cecb917edf61573cf2989e268636d9fb312
Parents: e567f8c 58f9fe4
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 12:44:38 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 12:44:38 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteInternalFuture.java   | 10 +++
 .../query/GridCacheQueryFutureAdapter.java      |  7 ++
 .../processors/igfs/IgfsMetaManager.java        | 43 +++++------
 .../internal/processors/igfs/IgfsUtils.java     | 52 +++++++++++++
 .../util/future/GridFinishedFuture.java         |  5 ++
 .../internal/util/future/GridFutureAdapter.java | 58 +++++++++------
 .../platform/PlatformAbstractPredicate.java     | 64 ++++++++++++++++
 .../processors/platform/PlatformBootstrap.java  |  1 -
 .../cluster/PlatformClusterNodeFilter.java      | 77 ++++++++++++++++++++
 scripts/apply-pull-request.sh                   |  2 +
 10 files changed, 274 insertions(+), 45 deletions(-)
----------------------------------------------------------------------



[06/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c8bc1f92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c8bc1f92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c8bc1f92

Branch: refs/heads/ignite-1124
Commit: c8bc1f92475f7ea4b216de7b024ce4e643db26e3
Parents: fc60fee e4ba2eb
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 12:08:17 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 12:08:17 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/PortableContext.java      |  3 --
 .../processors/platform/PlatformContext.java    |  9 ++++
 .../platform/utils/PlatformUtils.java           | 55 ++++++++++++++++++++
 3 files changed, 64 insertions(+), 3 deletions(-)
----------------------------------------------------------------------



[07/27] ignite git commit: ignite-1217: add suffic to comment " - Fixes #."

Posted by sb...@apache.org.
ignite-1217: add suffic to comment " - Fixes #<id>."


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cfc4d661
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cfc4d661
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cfc4d661

Branch: refs/heads/ignite-1124
Commit: cfc4d6615234f3f113193f43f8fc3c6c659ef680
Parents: e4ba2eb
Author: ashutak <as...@gridgain.com>
Authored: Wed Aug 26 22:27:28 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Aug 27 12:08:58 2015 +0300

----------------------------------------------------------------------
 scripts/apply-pull-request.sh | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/cfc4d661/scripts/apply-pull-request.sh
----------------------------------------------------------------------
diff --git a/scripts/apply-pull-request.sh b/scripts/apply-pull-request.sh
index baa73b5..d852d78 100755
--- a/scripts/apply-pull-request.sh
+++ b/scripts/apply-pull-request.sh
@@ -128,6 +128,8 @@ if [ "${COMMENT}" == "" ]; then
     COMMENT=${ORIG_COMMENT}
 fi
 
+COMMENT="${COMMENT} - Fixes #${PR_ID}."
+
 git commit --author "${AUTHOR}" -a -s -m "${COMMENT}" &> /dev/null
 
 echo "Squash commit for pull request with id='${PR_ID}' has been added. The commit has been added with comment '${COMMENT}'."


[20/27] ignite git commit: IGNITE-1310: Platforms: moved cluster group to Ignite.

Posted by sb...@apache.org.
IGNITE-1310: Platforms: moved cluster group to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/712b29c8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/712b29c8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/712b29c8

Branch: refs/heads/ignite-1124
Commit: 712b29c856614bd914e6bf35c5203aab624c93e7
Parents: 7c2c02b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 13:29:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 13:29:08 2015 +0300

----------------------------------------------------------------------
 .../platform/cluster/PlatformClusterGroup.java  | 330 +++++++++++++++++++
 1 file changed, 330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/712b29c8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
new file mode 100644
index 0000000..1f2a002
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Interop projection.
+ */
+@SuppressWarnings({"UnusedDeclaration"})
+public class PlatformClusterGroup extends PlatformAbstractTarget {
+    /** */
+    private static final int OP_ALL_METADATA = 1;
+
+    /** */
+    private static final int OP_FOR_ATTRIBUTE = 2;
+
+    /** */
+    private static final int OP_FOR_CACHE = 3;
+
+    /** */
+    private static final int OP_FOR_CLIENT = 4;
+
+    /** */
+    private static final int OP_FOR_DATA = 5;
+
+    /** */
+    private static final int OP_FOR_HOST = 6;
+
+    /** */
+    private static final int OP_FOR_NODE_IDS = 7;
+
+    /** */
+    private static final int OP_METADATA = 8;
+
+    /** */
+    private static final int OP_METRICS = 9;
+
+    /** */
+    private static final int OP_METRICS_FILTERED = 10;
+
+    /** */
+    private static final int OP_NODE_METRICS = 11;
+
+    /** */
+    private static final int OP_NODES = 12;
+
+    /** */
+    private static final int OP_PING_NODE = 13;
+
+    /** */
+    private static final int OP_TOPOLOGY = 14;
+
+    /** Projection. */
+    private final ClusterGroupEx prj;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param prj Projection.
+     */
+    public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
+        super(platformCtx);
+
+        this.prj = prj;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_METRICS:
+                platformCtx.writeClusterMetrics(writer, prj.metrics());
+
+                break;
+
+            case OP_ALL_METADATA:
+                platformCtx.writeAllMetadata(writer);
+
+                break;
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "deprecation"})
+    @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+        Object obj) throws IgniteCheckedException {
+        switch (type) {
+            case OP_METRICS_FILTERED: {
+                Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+                platformCtx.writeClusterMetrics(writer, prj.forNodeIds(ids).metrics());
+
+                break;
+            }
+
+            case OP_NODES: {
+                long oldTopVer = reader.readLong();
+
+                long curTopVer = platformCtx.kernalContext().discovery().topologyVersion();
+
+                if (curTopVer > oldTopVer) {
+                    writer.writeBoolean(true);
+
+                    writer.writeLong(curTopVer);
+
+                    // At this moment topology version might have advanced, and due to this race
+                    // we return outdated top ver to the callee. But this race is benign, the only
+                    // possible side effect is that the user will re-request nodes and we will return
+                    // the same set of nodes but with more recent topology version.
+                    Collection<ClusterNode> nodes = prj.nodes();
+
+                    platformCtx.writeNodes(writer, nodes);
+                }
+                else
+                    // No discovery events since last invocation.
+                    writer.writeBoolean(false);
+
+                break;
+            }
+
+            case OP_NODE_METRICS: {
+                UUID nodeId = reader.readUuid();
+
+                long lastUpdateTime = reader.readLong();
+
+                // Ask discovery because node might have been filtered out of current projection.
+                ClusterNode node = platformCtx.kernalContext().discovery().node(nodeId);
+
+                ClusterMetrics metrics = null;
+
+                if (node != null) {
+                    ClusterMetrics metrics0 = node.metrics();
+
+                    long triggerTime = lastUpdateTime + platformCtx.kernalContext().config().getMetricsUpdateFrequency();
+
+                    metrics = metrics0.getLastUpdateTime() > triggerTime ? metrics0 : null;
+                }
+
+                platformCtx.writeClusterMetrics(writer, metrics);
+
+                break;
+            }
+
+            case OP_METADATA: {
+                int typeId = reader.readInt();
+
+                platformCtx.writeMetadata(writer, typeId);
+
+                break;
+            }
+
+            case OP_TOPOLOGY: {
+                long topVer = reader.readLong();
+
+                platformCtx.writeNodes(writer, topology(topVer));
+
+                break;
+            }
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_PING_NODE:
+                return pingNode(reader.readUuid()) ? TRUE : FALSE;
+        }
+
+        return throwUnsupported(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object processInOpObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_FOR_NODE_IDS: {
+                Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forNodeIds(ids));
+            }
+
+            case OP_FOR_ATTRIBUTE:
+                return new PlatformClusterGroup(platformCtx,
+                    (ClusterGroupEx)prj.forAttribute(reader.readString(), reader.readString()));
+
+            case OP_FOR_CACHE: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forCacheNodes(cacheName));
+            }
+
+            case OP_FOR_CLIENT: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forClientNodes(cacheName));
+            }
+
+            case OP_FOR_DATA: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDataNodes(cacheName));
+            }
+
+            case OP_FOR_HOST: {
+                UUID nodeId = reader.readUuid();
+
+                ClusterNode node = prj.node(nodeId);
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx) prj.forHost(node));
+            }
+
+            default:
+                return throwUnsupported(type);
+        }
+    }
+
+    /**
+     * @param exclude Projection to exclude.
+     * @return New projection.
+     */
+    public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forRemotes() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forDaemons() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forRandom() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forOldest() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forYoungest() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
+    }
+
+    /**
+     * @return Projection.
+     */
+    public ClusterGroupEx projection() {
+        return prj;
+    }
+
+    /**
+     * Resets local I/O, job, and task execution metrics.
+     */
+    public void resetMetrics() {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        ((IgniteCluster)prj).resetMetrics();
+    }
+
+    /**
+     * Pings a remote node.
+     */
+    private boolean pingNode(UUID nodeId) {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        return ((IgniteCluster)prj).pingNode(nodeId);
+    }
+
+    /**
+     * Gets a topology by version. Returns {@code null} if topology history storage doesn't contain
+     * specified topology version (history currently keeps last {@code 1000} snapshots).
+     *
+     * @param topVer Topology version.
+     * @return Collection of grid nodes which represented by specified topology version,
+     * if it is present in history storage, {@code null} otherwise.
+     * @throws UnsupportedOperationException If underlying SPI implementation does not support
+     *      topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+     *      supports topology history.
+     */
+    private Collection<ClusterNode> topology(long topVer) {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        return ((IgniteCluster)prj).topology(topVer);
+    }
+}


[10/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a2b7ba1b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a2b7ba1b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a2b7ba1b

Branch: refs/heads/ignite-1124
Commit: a2b7ba1bb70ca67d09479b9a4ca1ab4d87cced08
Parents: cfc4d66 6b2ee50
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Thu Aug 27 12:12:54 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Thu Aug 27 12:12:54 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteInternalFuture.java   | 10 ++++
 .../org/apache/ignite/internal/IgnitionEx.java  | 23 +++++++-
 .../query/GridCacheQueryFutureAdapter.java      |  7 +++
 .../processors/igfs/IgfsMetaManager.java        | 43 +++++++--------
 .../internal/processors/igfs/IgfsUtils.java     | 52 ++++++++++++++++++
 .../util/future/GridFinishedFuture.java         |  5 ++
 .../internal/util/future/GridFutureAdapter.java | 58 +++++++++++++-------
 .../testframework/junits/GridAbstractTest.java  |  2 +-
 8 files changed, 153 insertions(+), 47 deletions(-)
----------------------------------------------------------------------



[25/27] ignite git commit: Added failover tests for async operations.

Posted by sb...@apache.org.
Added failover tests for async operations.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73ab5e2f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73ab5e2f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73ab5e2f

Branch: refs/heads/ignite-1124
Commit: 73ab5e2f7bc121eaf496096f205547c026c91464
Parents: b132006
Author: sboikov <sb...@gridgain.com>
Authored: Thu Aug 27 15:07:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Aug 27 15:07:49 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |   4 +
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +
 ...acheAsyncOperationsFailoverAbstractTest.java | 329 +++++++++++++++++++
 .../CacheAsyncOperationsFailoverAtomicTest.java |  32 ++
 .../CacheAsyncOperationsFailoverTxTest.java     |  32 ++
 .../IgniteCacheFailoverTestSuite2.java          |   4 +
 6 files changed, 405 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 54d33e0..c3bbbe4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -4135,6 +4135,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
                     new IgniteOutClosure<IgniteInternalFuture>() {
                         @Override public IgniteInternalFuture<T> apply() {
+                            if (ctx.kernalContext().isStopping())
+                                return new GridFinishedFuture<>(
+                                    new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+
                             return op.op(tx0).chain(new CX1<IgniteInternalFuture<T>, T>() {
                                 @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException {
                                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9087d20..4b8585e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -570,6 +570,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                 IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut,
                     new IgniteOutClosure<IgniteInternalFuture>() {
                         @Override public IgniteInternalFuture<T> apply() {
+                            if (ctx.kernalContext().isStopping())
+                                return new GridFinishedFuture<>(
+                                    new IgniteCheckedException("Operation has been cancelled (node is stopping)."));
+
                             return op.apply();
                         }
                     });

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
new file mode 100644
index 0000000..1669404
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAbstractTest.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ *
+ */
+public abstract class CacheAsyncOperationsFailoverAbstractTest extends GridCacheAbstractSelfTest {
+    /** */
+    private static final int NODE_CNT = 4;
+
+    /** */
+    private static final long TEST_TIME = 60_000;
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return NODE_CNT;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return TEST_TIME + 60_000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setCacheStoreFactory(null);
+        ccfg.setReadThrough(false);
+        ccfg.setWriteThrough(false);
+
+        return ccfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected NearCacheConfiguration nearConfiguration() {
+        return null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllAsyncFailover() throws Exception {
+        putAllAsyncFailover(5, 10);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testPutAllAsyncFailoverManyThreads() throws Exception {
+        putAllAsyncFailover(ignite(0).configuration().getSystemThreadPoolSize() * 2, 3);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAsyncFailover() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            log.info("Iteration: " + i);
+
+            startGrid(NODE_CNT);
+
+            final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+            int ops = cache.getConfiguration(CacheConfiguration.class).getMaxConcurrentAsyncOperations();
+
+            log.info("Max concurrent async operations: " + ops);
+
+            assertTrue(ops > 0);
+
+            final List<IgniteFuture<?>> futs = Collections.synchronizedList(new ArrayList<IgniteFuture<?>>(ops));
+
+            final AtomicInteger left = new AtomicInteger(ops);
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    List<IgniteFuture<?>> futs0 = new ArrayList<>();
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    while (left.getAndDecrement() > 0) {
+                        TreeMap<TestKey, TestValue> map = new TreeMap<>();
+
+                        int keys = 50;
+
+                        for (int k = 0; k < keys; k++)
+                            map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(k));
+
+                        cache.putAll(map);
+
+                        IgniteFuture<?> fut = cache.future();
+
+                        assertNotNull(fut);
+
+                        futs0.add(fut);
+                    }
+
+                    futs.addAll(futs0);
+
+                    return null;
+                }
+            }, 10, "put-thread");
+
+            stopGrid(NODE_CNT);
+
+            assertEquals(ops, futs.size());
+
+            for (IgniteFuture<?> fut : futs)
+                fut.get();
+        }
+    }
+
+    /**
+     * @param threads Number of threads.
+     * @param opsPerThread Number of concurrent async operations per thread.
+     * @throws Exception If failed.
+     */
+    private void putAllAsyncFailover(final int threads, final int opsPerThread) throws Exception {
+        log.info("Start test [threads=" + threads + ", opsPerThread=" + opsPerThread + ']');
+
+        final AtomicBoolean finished = new AtomicBoolean();
+
+        final long endTime = System.currentTimeMillis() + TEST_TIME;
+
+        IgniteInternalFuture<Object> restartFut = GridTestUtils.runAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                Thread.currentThread().setName("restart-thread");
+
+                while (!finished.get() && System.currentTimeMillis() < endTime) {
+                    startGrid(NODE_CNT);
+
+                    U.sleep(500);
+
+                    stopGrid(NODE_CNT);
+                }
+
+                return null;
+            }
+        });
+
+        try {
+            final IgniteCache<TestKey, TestValue> cache = ignite(0).<TestKey, TestValue>cache(null).withAsync();
+
+            GridTestUtils.runMultiThreaded(new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    int iter = 0;
+
+                    ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                    long time;
+
+                    long lastInfo = 0;
+
+                    while ((time = System.currentTimeMillis()) < endTime) {
+                        if (time - lastInfo > 5000)
+                            log.info("Starting operations [iter=" + iter + ']');
+
+                        List<IgniteFuture<?>> futs = new ArrayList<>(opsPerThread);
+
+                        for (int i = 0; i < opsPerThread; i++) {
+                            TreeMap<TestKey, TestValue> map = new TreeMap<>();
+
+                            int keys = rnd.nextInt(1, 50);
+
+                            for (int k = 0; k < keys; k++)
+                                map.put(new TestKey(rnd.nextInt(10_000)), new TestValue(iter));
+
+                            cache.putAll(map);
+
+                            IgniteFuture<?> fut = cache.future();
+
+                            assertNotNull(fut);
+
+                            futs.add(fut);
+                        }
+
+                        if (time - lastInfo > 5000) {
+                            log.info("Waiting for futures [iter=" + iter + ']');
+
+                            lastInfo = time;
+                        }
+
+                        for (IgniteFuture<?> fut : futs)
+                            fut.get();
+
+                        iter++;
+                    }
+
+                    return null;
+                }
+            }, threads, "update-thread");
+
+            finished.set(true);
+
+            restartFut.get();
+        }
+        finally {
+            finished.set(true);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestKey implements Serializable, Comparable<TestKey> {
+        /** */
+        private long key;
+
+        /**
+         * @param key Key.
+         */
+        public TestKey(long key) {
+            this.key = key;
+        }
+
+        /**
+         * @return Key.
+         */
+        public long key() {
+            return key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int compareTo(@NotNull TestKey other) {
+            return ((Long)key).compareTo(other.key);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey other = (TestKey)o;
+
+            return key == other.key;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return (int)(key ^ (key >>> 32));
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestKey.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private static class TestValue implements Serializable {
+        /** */
+        private long val;
+
+        /**
+         * @param val Value.
+         */
+        public TestValue(long val) {
+            this.val = val;
+        }
+
+        /**
+         * @return Value.
+         */
+        public long value() {
+            return val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestValue other = (TestValue)o;
+
+            return val == other.val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(TestValue.class, this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
new file mode 100644
index 0000000..6e01a4a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverAtomicTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsFailoverAtomicTest extends CacheAsyncOperationsFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
new file mode 100644
index 0000000..ba3ad7a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheAsyncOperationsFailoverTxTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+
+/**
+ *
+ */
+public class CacheAsyncOperationsFailoverTxTest extends CacheAsyncOperationsFailoverAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return TRANSACTIONAL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/73ab5e2f/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
index f3fac23..ba510f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite2.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.*;
 import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.*;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
 import org.apache.ignite.internal.processors.cache.distributed.near.*;
@@ -48,6 +49,9 @@ public class IgniteCacheFailoverTestSuite2 {
 
         suite.addTestSuite(IgniteCacheCrossCacheTxFailoverTest.class);
 
+        suite.addTestSuite(CacheAsyncOperationsFailoverAtomicTest.class);
+        suite.addTestSuite(CacheAsyncOperationsFailoverTxTest.class);
+
         return suite;
     }
 }


[13/27] ignite git commit: Merge remote-tracking branch 'origin/master'

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/58f9fe42
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/58f9fe42
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/58f9fe42

Branch: refs/heads/ignite-1124
Commit: 58f9fe425d4eff652d459bec722dba3c22e5e03a
Parents: 93b2942 a2b7ba1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 12:26:23 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:26:23 2015 +0300

----------------------------------------------------------------------
 scripts/apply-pull-request.sh | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------



[02/27] ignite git commit: Further refactorings necessary for platforms move to Ignite.

Posted by sb...@apache.org.
Further refactorings necessary for platforms move to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5877b301
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5877b301
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5877b301

Branch: refs/heads/ignite-1124
Commit: 5877b301a89ed87531d503fe92474d8a8568134a
Parents: 136c099
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 09:53:17 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 09:53:17 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java    | 114 +++
 .../platform/PlatformNoopProcessor.java         |   5 +
 .../processors/platform/PlatformProcessor.java  |   7 +
 .../callback/PlatformCallbackGateway.java       | 869 +++++++++++++++++++
 .../callback/PlatformCallbackUtils.java         | 468 ++++++++++
 .../platform/memory/PlatformInputStream.java    |  30 +
 .../platform/memory/PlatformMemory.java         |  77 ++
 .../platform/memory/PlatformMemoryManager.java  |  46 +
 .../platform/memory/PlatformOutputStream.java   |  30 +
 .../processors/platform/PlatformContext.java    | 114 ---
 .../callback/PlatformCallbackGateway.java       | 869 -------------------
 .../callback/PlatformCallbackUtils.java         | 468 ----------
 .../platform/memory/PlatformInputStream.java    |  30 -
 .../platform/memory/PlatformMemory.java         |  77 --
 .../platform/memory/PlatformMemoryManager.java  |  46 -
 .../platform/memory/PlatformOutputStream.java   |  30 -
 .../platform/utils/PlatformUtils.java           |  23 +
 17 files changed, 1669 insertions(+), 1634 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
new file mode 100644
index 0000000..fb1eaa2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.callback.*;
+import org.apache.ignite.internal.processors.platform.memory.*;
+
+import java.util.*;
+
+/**
+ * Platform context. Acts as an entry point for platform operations.
+ */
+public interface PlatformContext {
+    /**
+     * Gets kernal context.
+     *
+     * @return Kernal context.
+     */
+    public GridKernalContext kernalContext();
+
+    /**
+     * Gets platform memory manager.
+     *
+     * @return Memory manager.
+     */
+    public PlatformMemoryManager memory();
+
+    /**
+     * Gets platform callback gateway.
+     *
+     * @return Callback gateway.
+     */
+    public PlatformCallbackGateway gateway();
+
+    /**
+     * Get memory reader.
+     *
+     * @param mem Memory.
+     * @return Reader.
+     */
+    public PortableRawReaderEx reader(PlatformMemory mem);
+
+    /**
+     * Get memory reader.
+     *
+     * @param in Input.
+     * @return Reader.
+     */
+    public PortableRawReaderEx reader(PlatformInputStream in);
+
+    /**
+     * Get memory writer.
+     *
+     * @param mem Memory.
+     * @return Writer.
+     */
+    public PortableRawWriterEx writer(PlatformMemory mem);
+
+    /**
+     * Get memory writer.
+     *
+     * @param out Output.
+     * @return Writer.
+     */
+    public PortableRawWriterEx writer(PlatformOutputStream out);
+
+    /**
+     * Sends node info to native platform, if necessary.
+     *
+     * @param node Node.
+     */
+    public void addNode(ClusterNode node);
+
+    /**
+     * Writes a node id to a stream and sends node info to native platform, if necessary.
+     *
+     * @param writer Writer.
+     * @param node Node.
+     */
+    public void writeNode(PortableRawWriterEx writer, ClusterNode node);
+
+    /**
+     * Writes multiple node ids to a stream and sends node info to native platform, if necessary.
+     *
+     * @param writer Writer.
+     * @param nodes Nodes.
+     */
+    public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes);
+
+    /**
+     * Process metadata from the platform.
+     *
+     * @param reader Reader.
+     */
+    public void processMetadata(PortableRawReaderEx reader);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
index 245d4d7..9bdc3be 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java
@@ -38,4 +38,9 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf
     @Override public long environmentPointer() {
         return 0;
     }
+
+    /** {@inheritDoc} */
+    @Override public PlatformContext context() {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
index 137c31b..782db4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java
@@ -37,4 +37,11 @@ public interface PlatformProcessor extends GridProcessor {
      * @return Environment pointer.
      */
     public long environmentPointer();
+
+    /**
+     * Gets platform context.
+     *
+     * @return Platform context.
+     */
+    public PlatformContext context();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
new file mode 100644
index 0000000..a8e7879
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
@@ -0,0 +1,869 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.callback;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.*;
+
+/**
+ * Gateway to all platform-dependent callbacks. Implementers might extend this class and provide additional callbacks.
+ */
+@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
+public class PlatformCallbackGateway {
+    /** Environment pointer. */
+    protected final long envPtr;
+
+    /** Lock. */
+    private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
+
+    /**
+     * Native gateway.
+     *
+     * @param envPtr Environment pointer.
+     */
+    public PlatformCallbackGateway(long envPtr) {
+        this.envPtr = envPtr;
+    }
+
+    /**
+     * Get environment pointer.
+     *
+     * @return Environment pointer.
+     */
+    public long environmentPointer() {
+        return envPtr;
+    }
+
+    /**
+     * Create cache store.
+     *
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    public long cacheStoreCreate(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.cacheStoreCreate(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param objPtr Object pointer.
+     * @param memPtr Memory pointer.
+     * @param cb Callback.
+     * @return Result.
+     */
+    public int cacheStoreInvoke(long objPtr, long memPtr, Object cb) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr, cb);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param objPtr Object pointer.
+     */
+    public void cacheStoreDestroy(long objPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.cacheStoreDestroy(envPtr, objPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Creates cache store session.
+     *
+     * @param storePtr Store instance pointer.
+     * @return Session instance pointer.
+     */
+    public long cacheStoreSessionCreate(long storePtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.cacheStoreSessionCreate(envPtr, storePtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Creates cache entry filter and returns a pointer.
+     *
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    public long cacheEntryFilterCreate(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.cacheEntryFilterCreate(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param ptr Pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    public int cacheEntryFilterApply(long ptr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.cacheEntryFilterApply(envPtr, ptr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param ptr Pointer.
+     */
+    public void cacheEntryFilterDestroy(long ptr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.cacheEntryFilterDestroy(envPtr, ptr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Invoke cache entry processor.
+     *
+     * @param outMemPtr Output memory pointer.
+     * @param inMemPtr Input memory pointer.
+     */
+    public void cacheInvoke(long outMemPtr, long inMemPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.cacheInvoke(envPtr, outMemPtr, inMemPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
+     *
+     * @param taskPtr Task pointer.
+     * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
+     * @param inMemPtr Input memory pointer.
+     */
+    public void computeTaskMap(long taskPtr, long outMemPtr, long inMemPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.computeTaskMap(envPtr, taskPtr, outMemPtr, inMemPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Perform native task job result notification.
+     *
+     * @param taskPtr Task pointer.
+     * @param jobPtr Job pointer.
+     * @param memPtr Memory pointer (always zero for local job execution).
+     * @return Job result enum ordinal.
+     */
+    public int computeTaskJobResult(long taskPtr, long jobPtr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.computeTaskJobResult(envPtr, taskPtr, jobPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Perform native task reduce.
+     *
+     * @param taskPtr Task pointer.
+     */
+    public void computeTaskReduce(long taskPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.computeTaskReduce(envPtr, taskPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Complete task with native error.
+     *
+     * @param taskPtr Task pointer.
+     * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
+     */
+    public void computeTaskComplete(long taskPtr, long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.computeTaskComplete(envPtr, taskPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Serialize native job.
+     *
+     * @param jobPtr Job pointer.
+     * @param memPtr Memory pointer.
+     * @return {@code True} if serialization succeeded.
+     */
+    public int computeJobSerialize(long jobPtr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.computeJobSerialize(envPtr, jobPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Create job in native platform.
+     *
+     * @param memPtr Memory pointer.
+     * @return Pointer to job.
+     */
+    public long computeJobCreate(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.computeJobCreate(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Execute native job on a node other than where it was created.
+     *
+     * @param jobPtr Job pointer.
+     * @param cancel Cancel flag.
+     * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
+     */
+    public void computeJobExecute(long jobPtr, int cancel, long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.computeJobExecute(envPtr, jobPtr, cancel, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Cancel the job.
+     *
+     * @param jobPtr Job pointer.
+     */
+    public void computeJobCancel(long jobPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.computeJobCancel(envPtr, jobPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Destroy the job.
+     *
+     * @param ptr Pointer.
+     */
+    public void computeJobDestroy(long ptr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.computeJobDestroy(envPtr, ptr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Invoke local callback.
+     *
+     * @param cbPtr Callback pointer.
+     * @param memPtr Memory pointer.
+     */
+    public void continuousQueryListenerApply(long cbPtr, long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.continuousQueryListenerApply(envPtr, cbPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Create filter in native platform.
+     *
+     * @param memPtr Memory pointer.
+     * @return Pointer to created filter.
+     */
+    public long continuousQueryFilterCreate(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.continuousQueryFilterCreate(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Invoke remote filter.
+     *
+     * @param filterPtr Filter pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    public int continuousQueryFilterApply(long filterPtr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.continuousQueryFilterApply(envPtr, filterPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Release remote  filter.
+     *
+     * @param filterPtr Filter pointer.
+     */
+    public void continuousQueryFilterRelease(long filterPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.continuousQueryFilterRelease(envPtr, filterPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify native data streamer about topology update.
+     *
+     * @param ptr Data streamer native pointer.
+     * @param topVer Topology version.
+     * @param topSize Topology size.
+     */
+    public void dataStreamerTopologyUpdate(long ptr, long topVer, int topSize) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.dataStreamerTopologyUpdate(envPtr, ptr, topVer, topSize);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Invoke stream receiver.
+     *
+     * @param ptr Receiver native pointer.
+     * @param cache Cache object.
+     * @param memPtr Stream pointer.
+     * @param keepPortable Portable flag.
+     */
+    public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepPortable) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.dataStreamerStreamReceiverInvoke(envPtr, ptr, cache, memPtr, keepPortable);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with byte result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureByteResult(long futPtr, int res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureByteResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with boolean result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureBoolResult(long futPtr, int res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureBoolResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with short result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureShortResult(long futPtr, int res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureShortResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with byte result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureCharResult(long futPtr, int res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureCharResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with int result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureIntResult(long futPtr, int res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureIntResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with float result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureFloatResult(long futPtr, float res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureFloatResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with long result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureLongResult(long futPtr, long res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureLongResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with double result.
+     *
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    public void futureDoubleResult(long futPtr, double res) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureDoubleResult(envPtr, futPtr, res);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with object result.
+     *
+     * @param futPtr Future pointer.
+     * @param memPtr Memory pointer.
+     */
+    public void futureObjectResult(long futPtr, long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureObjectResult(envPtr, futPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with null result.
+     *
+     * @param futPtr Future pointer.
+     */
+    public void futureNullResult(long futPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureNullResult(envPtr, futPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Notify future with error.
+     *
+     * @param futPtr Future pointer.
+     * @param memPtr Pointer to memory with error information.
+     */
+    public void futureError(long futPtr, long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.futureError(envPtr, futPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Creates message filter and returns a pointer.
+     *
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    public long messagingFilterCreate(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.messagingFilterCreate(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param ptr Pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    public int messagingFilterApply(long ptr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.messagingFilterApply(envPtr, ptr, memPtr);
+        }
+        finally {
+            leave();
+        }}
+
+    /**
+     * @param ptr Pointer.
+     */
+    public void messagingFilterDestroy(long ptr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.messagingFilterDestroy(envPtr, ptr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Creates event filter and returns a pointer.
+     *
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    public long eventFilterCreate(long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.eventFilterCreate(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param ptr Pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    public int eventFilterApply(long ptr, long memPtr) {
+        enter();
+
+        try {
+            return PlatformCallbackUtils.eventFilterApply(envPtr, ptr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * @param ptr Pointer.
+     */
+    public void eventFilterDestroy(long ptr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.eventFilterDestroy(envPtr, ptr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Sends node info to native target.
+     *
+     * @param memPtr Ptr to a stream with serialized node.
+     */
+    public void nodeInfo(long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.nodeInfo(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Kernal start callback.
+     *
+     * @param memPtr Memory pointer.
+     */
+    public void onStart(long memPtr) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.onStart(envPtr, memPtr);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Lifecycle event callback.
+     *
+     * @param ptr Holder pointer.
+     * @param evt Event.
+     */
+    public void lifecycleEvent(long ptr, int evt) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.lifecycleEvent(envPtr, ptr, evt);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Re-allocate external memory chunk.
+     *
+     * @param memPtr Cross-platform pointer.
+     * @param cap Capacity.
+     */
+    public void memoryReallocate(long memPtr, int cap) {
+        enter();
+
+        try {
+            PlatformCallbackUtils.memoryReallocate(envPtr, memPtr, cap);
+        }
+        finally {
+            leave();
+        }
+    }
+
+    /**
+     * Initializes native service.
+     *
+     * @param memPtr Pointer.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public long serviceInit(long memPtr) throws IgniteCheckedException {
+        return PlatformCallbackUtils.serviceInit(envPtr, memPtr);
+    }
+
+    /**
+     * Executes native service.
+     *
+     * @param svcPtr Pointer to the service in the native platform.
+     * @param memPtr Stream pointer.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public void serviceExecute(long svcPtr, long memPtr) throws IgniteCheckedException {
+        PlatformCallbackUtils.serviceExecute(envPtr, svcPtr, memPtr);
+    }
+
+    /**
+     * Cancels native service.
+     *
+     * @param svcPtr Pointer to the service in the native platform.
+     * @param memPtr Stream pointer.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public void serviceCancel(long svcPtr, long memPtr) throws IgniteCheckedException {
+        PlatformCallbackUtils.serviceCancel(envPtr, svcPtr, memPtr);
+    }
+
+    /**
+     * Invokes service method.
+     *
+     * @param svcPtr Pointer to the service in the native platform.
+     * @param outMemPtr Output memory pointer.
+     * @param inMemPtr Input memory pointer.
+     * @throws org.apache.ignite.IgniteCheckedException In case of error.
+     */
+    public void serviceInvokeMethod(long svcPtr, long outMemPtr, long inMemPtr) throws IgniteCheckedException {
+        PlatformCallbackUtils.serviceInvokeMethod(envPtr, svcPtr, outMemPtr, inMemPtr);
+    }
+
+    /**
+     * Invokes cluster node filter.
+     *
+     * @param memPtr Stream pointer.
+     */
+    public int clusterNodeFilterApply(long memPtr) {
+        return PlatformCallbackUtils.clusterNodeFilterApply(envPtr, memPtr);
+    }
+
+    /**
+     * Kernal stop callback.
+     */
+    public void onStop() {
+        block();
+
+        PlatformCallbackUtils.onStop(envPtr);
+    }
+
+    /**
+     * Enter gateway.
+     */
+    protected void enter() {
+        if (!lock.tryReadLock())
+            throw new IgniteException("Failed to execute native callback because grid is stopping.");
+    }
+
+    /**
+     * Leave gateway.
+     */
+    protected void leave() {
+        lock.readUnlock();
+    }
+
+    /**
+     * Block gateway.
+     */
+    protected void block() {
+        lock.writeLock();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
new file mode 100644
index 0000000..7e9587f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.callback;
+
+/**
+ * Platform callback utility methods. Implemented in target platform. All methods in this class must be
+ * package-visible and invoked only through {@link PlatformCallbackGateway}.
+ */
+public class PlatformCallbackUtils {
+    /**
+     * Create cache store.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    static native long cacheStoreCreate(long envPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Object pointer.
+     * @param memPtr Memory pointer.
+     * @param cb Callback.
+     * @return Result.
+     */
+    static native int cacheStoreInvoke(long envPtr, long objPtr, long memPtr, Object cb);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Object pointer.
+     */
+    static native void cacheStoreDestroy(long envPtr, long objPtr);
+
+    /**
+     * Creates cache store session.
+     *
+     * @param envPtr Environment pointer.
+     * @param storePtr Store instance pointer.
+     * @return Session instance pointer.
+     */
+    static native long cacheStoreSessionCreate(long envPtr, long storePtr);
+
+    /**
+     * Creates cache entry filter and returns a pointer.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    static native long cacheEntryFilterCreate(long envPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    static native int cacheEntryFilterApply(long envPtr, long objPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Pointer.
+     */
+    static native void cacheEntryFilterDestroy(long envPtr, long objPtr);
+
+    /**
+     * Invoke cache entry processor.
+     *
+     * @param envPtr Environment pointer.
+     * @param outMemPtr Output memory pointer.
+     * @param inMemPtr Input memory pointer.
+     */
+    static native void cacheInvoke(long envPtr, long outMemPtr, long inMemPtr);
+
+    /**
+     * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
+     *
+     * @param envPtr Environment pointer.
+     * @param taskPtr Task pointer.
+     * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
+     * @param inMemPtr Input memory pointer.
+     */
+    static native void computeTaskMap(long envPtr, long taskPtr, long outMemPtr, long inMemPtr);
+
+    /**
+     * Perform native task job result notification.
+     *
+     * @param envPtr Environment pointer.
+     * @param taskPtr Task pointer.
+     * @param jobPtr Job pointer.
+     * @param memPtr Memory pointer (always zero for local job execution).
+     * @return Job result enum ordinal.
+     */
+    static native int computeTaskJobResult(long envPtr, long taskPtr, long jobPtr, long memPtr);
+
+    /**
+     * Perform native task reduce.
+     *
+     * @param envPtr Environment pointer.
+     * @param taskPtr Task pointer.
+     */
+    static native void computeTaskReduce(long envPtr, long taskPtr);
+
+    /**
+     * Complete task with native error.
+     *
+     * @param envPtr Environment pointer.
+     * @param taskPtr Task pointer.
+     * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
+     */
+    static native void computeTaskComplete(long envPtr, long taskPtr, long memPtr);
+
+    /**
+     * Serialize native job.
+     *
+     * @param envPtr Environment pointer.
+     * @param jobPtr Job pointer.
+     * @param memPtr Memory pointer.
+     * @return {@code True} if serialization succeeded.
+     */
+    static native int computeJobSerialize(long envPtr, long jobPtr, long memPtr);
+
+    /**
+     * Create job in native platform.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     * @return Pointer to job.
+     */
+    static native long computeJobCreate(long envPtr, long memPtr);
+
+    /**
+     * Execute native job on a node other than where it was created.
+     *
+     * @param envPtr Environment pointer.
+     * @param jobPtr Job pointer.
+     * @param cancel Cancel flag.
+     * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
+     */
+    static native void computeJobExecute(long envPtr, long jobPtr, int cancel, long memPtr);
+
+    /**
+     * Cancel the job.
+     *
+     * @param envPtr Environment pointer.
+     * @param jobPtr Job pointer.
+     */
+    static native void computeJobCancel(long envPtr, long jobPtr);
+
+    /**
+     * Destroy the job.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Pointer.
+     */
+    static native void computeJobDestroy(long envPtr, long ptr);
+
+    /**
+     * Invoke local callback.
+     *
+     * @param envPtr Environment pointer.
+     * @param cbPtr Callback pointer.
+     * @param memPtr Memory pointer.
+     */
+    static native void continuousQueryListenerApply(long envPtr, long cbPtr, long memPtr);
+
+    /**
+     * Create filter in native platform.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     * @return Pointer to created filter.
+     */
+    static native long continuousQueryFilterCreate(long envPtr, long memPtr);
+
+    /**
+     * Invoke remote filter.
+     *
+     * @param envPtr Environment pointer.
+     * @param filterPtr Filter pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    static native int continuousQueryFilterApply(long envPtr, long filterPtr, long memPtr);
+
+    /**
+     * Release remote  filter.
+     *
+     * @param envPtr Environment pointer.
+     * @param filterPtr Filter pointer.
+     */
+    static native void continuousQueryFilterRelease(long envPtr, long filterPtr);
+
+    /**
+     * Notify native data streamer about topology update.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Data streamer native pointer.
+     * @param topVer Topology version.
+     * @param topSize Topology size.
+     */
+    static native void dataStreamerTopologyUpdate(long envPtr, long ptr, long topVer, int topSize);
+
+    /**
+     * Invoke stream receiver.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Receiver native pointer.
+     * @param cache Cache object.
+     * @param memPtr Stream pointer.
+     * @param keepPortable Portable flag.
+     */
+    static native void dataStreamerStreamReceiverInvoke(long envPtr, long ptr, Object cache, long memPtr,
+        boolean keepPortable);
+
+    /**
+     * Notify future with byte result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureByteResult(long envPtr, long futPtr, int res);
+
+    /**
+     * Notify future with boolean result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureBoolResult(long envPtr, long futPtr, int res);
+
+    /**
+     * Notify future with short result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureShortResult(long envPtr, long futPtr, int res);
+
+    /**
+     * Notify future with byte result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureCharResult(long envPtr, long futPtr, int res);
+
+    /**
+     * Notify future with int result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureIntResult(long envPtr, long futPtr, int res);
+
+    /**
+     * Notify future with float result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureFloatResult(long envPtr, long futPtr, float res);
+
+    /**
+     * Notify future with long result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureLongResult(long envPtr, long futPtr, long res);
+
+    /**
+     * Notify future with double result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param res Result.
+     */
+    static native void futureDoubleResult(long envPtr, long futPtr, double res);
+
+    /**
+     * Notify future with object result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param memPtr Memory pointer.
+     */
+    static native void futureObjectResult(long envPtr, long futPtr, long memPtr);
+
+    /**
+     * Notify future with null result.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     */
+    static native void futureNullResult(long envPtr, long futPtr);
+
+    /**
+     * Notify future with error.
+     *
+     * @param envPtr Environment pointer.
+     * @param futPtr Future pointer.
+     * @param memPtr Pointer to memory with error information.
+     */
+    static native void futureError(long envPtr, long futPtr, long memPtr);
+
+    /**
+     * Creates message filter and returns a pointer.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    static native long messagingFilterCreate(long envPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    static native int messagingFilterApply(long envPtr, long objPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Pointer.
+     */
+    static native void messagingFilterDestroy(long envPtr, long objPtr);
+
+    /**
+     * Creates event filter and returns a pointer.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     * @return Pointer.
+     */
+    static native long eventFilterCreate(long envPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Pointer.
+     * @param memPtr Memory pointer.
+     * @return Result.
+     */
+    static native int eventFilterApply(long envPtr, long objPtr, long memPtr);
+
+    /**
+     * @param envPtr Environment pointer.
+     * @param objPtr Pointer.
+     */
+    static native void eventFilterDestroy(long envPtr, long objPtr);
+
+    /**
+     * Sends node info to native target.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Ptr to a stream with serialized node.
+     */
+    static native void nodeInfo(long envPtr, long memPtr);
+
+    /**
+     * Kernal start callback.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Memory pointer.
+     */
+    static native void onStart(long envPtr, long memPtr);
+
+    /*
+     * Kernal stop callback.
+     *
+     * @param envPtr Environment pointer.
+     */
+    static native void onStop(long envPtr);
+
+    /**
+     * Lifecycle event callback.
+     *
+     * @param envPtr Environment pointer.
+     * @param ptr Holder pointer.
+     * @param evt Event.
+     */
+    static native void lifecycleEvent(long envPtr, long ptr, int evt);
+
+    /**
+     * Re-allocate external memory chunk.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Cross-platform pointer.
+     * @param cap Capacity.
+     */
+    static native void memoryReallocate(long envPtr, long memPtr, int cap);
+
+    /**
+     * Initializes native service.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Stream pointer.
+     * @return Pointer to the native platform service.
+     */
+    static native long serviceInit(long envPtr, long memPtr);
+
+    /**
+     * Executes native service.
+     *
+     * @param envPtr Environment pointer.
+     * @param svcPtr Pointer to the service in the native platform.
+     * @param memPtr Stream pointer.
+     */
+    static native void serviceExecute(long envPtr, long svcPtr, long memPtr);
+
+    /**
+     * Cancels native service.
+     *
+     * @param envPtr Environment pointer.
+     * @param svcPtr Pointer to the service in the native platform.
+     * @param memPtr Stream pointer.
+     */
+    static native void serviceCancel(long envPtr, long svcPtr, long memPtr);
+
+    /**
+     /**
+     * Invokes service method.
+     *
+     * @param envPtr Environment pointer.
+     * @param svcPtr Pointer to the service in the native platform.
+     * @param outMemPtr Output memory pointer.
+     * @param inMemPtr Input memory pointer.
+     */
+    static native void serviceInvokeMethod(long envPtr, long svcPtr, long outMemPtr, long inMemPtr);
+
+    /**
+     * Invokes cluster node filter.
+     *
+     * @param envPtr Environment pointer.
+     * @param memPtr Stream pointer.
+     */
+    static native int clusterNodeFilterApply(long envPtr, long memPtr);
+
+    /**
+     * Private constructor.
+     */
+    private PlatformCallbackUtils() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
new file mode 100644
index 0000000..9273e29
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.portable.streams.*;
+
+/**
+ * Interop output stream,
+ */
+public interface PlatformInputStream extends PortableInputStream {
+    /**
+     * Synchronize input. Must be called before start reading data from a memory changed by another platform.
+     */
+    public void synchronize();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
new file mode 100644
index 0000000..9d8f94e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop memory chunk.
+ */
+public interface PlatformMemory extends AutoCloseable {
+    /**
+     * Gets input stream.
+     *
+     * @return Input stream.
+     */
+    public PlatformInputStream input();
+
+    /**
+     * Gets output stream.
+     *
+     * @return Output stream.
+     */
+    public PlatformOutputStream output();
+
+    /**
+     * Gets pointer which can be passed between platforms.
+     *
+     * @return Pointer.
+     */
+    public long pointer();
+
+    /**
+     * Gets data pointer.
+     *
+     * @return Data pointer.
+     */
+    public long data();
+
+    /**
+     * Gets capacity.
+     *
+     * @return Capacity.
+     */
+    public int capacity();
+
+    /**
+     * Gets length.
+     *
+     * @return Length.
+     */
+    public int length();
+
+    /**
+     * Reallocate memory chunk.
+     *
+     * @param cap Minimum capacity.
+     */
+    public void reallocate(int cap);
+
+    /**
+     * Close memory releasing it.
+     */
+    @Override void close();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
new file mode 100644
index 0000000..c2233a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformMemoryManager.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+/**
+ * Interop memory manager interface.
+ */
+public interface PlatformMemoryManager {
+    /**
+     * Allocates memory.
+     *
+     * @return Memory.
+     */
+    public PlatformMemory allocate();
+
+    /**
+     * Allocates memory having at least the given capacity.
+     *
+     * @param cap Minimum capacity.
+     * @return Memory.
+     */
+    public PlatformMemory allocate(int cap);
+
+    /**
+     * Gets memory from existing pointer.
+     *
+     * @param memPtr Cross-platform memory pointer.
+     * @return Memory.
+     */
+    public PlatformMemory get(long memPtr);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
new file mode 100644
index 0000000..eb2490a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStream.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.memory;
+
+import org.apache.ignite.internal.portable.streams.*;
+
+/**
+ * Interop output stream.
+ */
+public interface PlatformOutputStream extends PortableOutputStream {
+    /**
+     * Synchronize output stream with underlying memory
+     */
+    public void synchronize();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
deleted file mode 100644
index fb1eaa2..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform;
-
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.portable.*;
-import org.apache.ignite.internal.processors.platform.callback.*;
-import org.apache.ignite.internal.processors.platform.memory.*;
-
-import java.util.*;
-
-/**
- * Platform context. Acts as an entry point for platform operations.
- */
-public interface PlatformContext {
-    /**
-     * Gets kernal context.
-     *
-     * @return Kernal context.
-     */
-    public GridKernalContext kernalContext();
-
-    /**
-     * Gets platform memory manager.
-     *
-     * @return Memory manager.
-     */
-    public PlatformMemoryManager memory();
-
-    /**
-     * Gets platform callback gateway.
-     *
-     * @return Callback gateway.
-     */
-    public PlatformCallbackGateway gateway();
-
-    /**
-     * Get memory reader.
-     *
-     * @param mem Memory.
-     * @return Reader.
-     */
-    public PortableRawReaderEx reader(PlatformMemory mem);
-
-    /**
-     * Get memory reader.
-     *
-     * @param in Input.
-     * @return Reader.
-     */
-    public PortableRawReaderEx reader(PlatformInputStream in);
-
-    /**
-     * Get memory writer.
-     *
-     * @param mem Memory.
-     * @return Writer.
-     */
-    public PortableRawWriterEx writer(PlatformMemory mem);
-
-    /**
-     * Get memory writer.
-     *
-     * @param out Output.
-     * @return Writer.
-     */
-    public PortableRawWriterEx writer(PlatformOutputStream out);
-
-    /**
-     * Sends node info to native platform, if necessary.
-     *
-     * @param node Node.
-     */
-    public void addNode(ClusterNode node);
-
-    /**
-     * Writes a node id to a stream and sends node info to native platform, if necessary.
-     *
-     * @param writer Writer.
-     * @param node Node.
-     */
-    public void writeNode(PortableRawWriterEx writer, ClusterNode node);
-
-    /**
-     * Writes multiple node ids to a stream and sends node info to native platform, if necessary.
-     *
-     * @param writer Writer.
-     * @param nodes Nodes.
-     */
-    public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes);
-
-    /**
-     * Process metadata from the platform.
-     *
-     * @param reader Reader.
-     */
-    public void processMetadata(PortableRawReaderEx reader);
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/5877b301/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
deleted file mode 100644
index a8e7879..0000000
--- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java
+++ /dev/null
@@ -1,869 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.platform.callback;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.*;
-
-/**
- * Gateway to all platform-dependent callbacks. Implementers might extend this class and provide additional callbacks.
- */
-@SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"})
-public class PlatformCallbackGateway {
-    /** Environment pointer. */
-    protected final long envPtr;
-
-    /** Lock. */
-    private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
-
-    /**
-     * Native gateway.
-     *
-     * @param envPtr Environment pointer.
-     */
-    public PlatformCallbackGateway(long envPtr) {
-        this.envPtr = envPtr;
-    }
-
-    /**
-     * Get environment pointer.
-     *
-     * @return Environment pointer.
-     */
-    public long environmentPointer() {
-        return envPtr;
-    }
-
-    /**
-     * Create cache store.
-     *
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    public long cacheStoreCreate(long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.cacheStoreCreate(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param objPtr Object pointer.
-     * @param memPtr Memory pointer.
-     * @param cb Callback.
-     * @return Result.
-     */
-    public int cacheStoreInvoke(long objPtr, long memPtr, Object cb) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.cacheStoreInvoke(envPtr, objPtr, memPtr, cb);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param objPtr Object pointer.
-     */
-    public void cacheStoreDestroy(long objPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.cacheStoreDestroy(envPtr, objPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Creates cache store session.
-     *
-     * @param storePtr Store instance pointer.
-     * @return Session instance pointer.
-     */
-    public long cacheStoreSessionCreate(long storePtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.cacheStoreSessionCreate(envPtr, storePtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Creates cache entry filter and returns a pointer.
-     *
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    public long cacheEntryFilterCreate(long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.cacheEntryFilterCreate(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param ptr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    public int cacheEntryFilterApply(long ptr, long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.cacheEntryFilterApply(envPtr, ptr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param ptr Pointer.
-     */
-    public void cacheEntryFilterDestroy(long ptr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.cacheEntryFilterDestroy(envPtr, ptr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Invoke cache entry processor.
-     *
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     */
-    public void cacheInvoke(long outMemPtr, long inMemPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.cacheInvoke(envPtr, outMemPtr, inMemPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Perform native task map. Do not throw exceptions, serializing them to the output stream instead.
-     *
-     * @param taskPtr Task pointer.
-     * @param outMemPtr Output memory pointer (exists if topology changed, otherwise {@code 0}).
-     * @param inMemPtr Input memory pointer.
-     */
-    public void computeTaskMap(long taskPtr, long outMemPtr, long inMemPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.computeTaskMap(envPtr, taskPtr, outMemPtr, inMemPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Perform native task job result notification.
-     *
-     * @param taskPtr Task pointer.
-     * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer (always zero for local job execution).
-     * @return Job result enum ordinal.
-     */
-    public int computeTaskJobResult(long taskPtr, long jobPtr, long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.computeTaskJobResult(envPtr, taskPtr, jobPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Perform native task reduce.
-     *
-     * @param taskPtr Task pointer.
-     */
-    public void computeTaskReduce(long taskPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.computeTaskReduce(envPtr, taskPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Complete task with native error.
-     *
-     * @param taskPtr Task pointer.
-     * @param memPtr Memory pointer with exception data or {@code 0} in case of success.
-     */
-    public void computeTaskComplete(long taskPtr, long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.computeTaskComplete(envPtr, taskPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Serialize native job.
-     *
-     * @param jobPtr Job pointer.
-     * @param memPtr Memory pointer.
-     * @return {@code True} if serialization succeeded.
-     */
-    public int computeJobSerialize(long jobPtr, long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.computeJobSerialize(envPtr, jobPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Create job in native platform.
-     *
-     * @param memPtr Memory pointer.
-     * @return Pointer to job.
-     */
-    public long computeJobCreate(long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.computeJobCreate(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Execute native job on a node other than where it was created.
-     *
-     * @param jobPtr Job pointer.
-     * @param cancel Cancel flag.
-     * @param memPtr Memory pointer to write result to for remote job execution or {@code 0} for local job execution.
-     */
-    public void computeJobExecute(long jobPtr, int cancel, long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.computeJobExecute(envPtr, jobPtr, cancel, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Cancel the job.
-     *
-     * @param jobPtr Job pointer.
-     */
-    public void computeJobCancel(long jobPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.computeJobCancel(envPtr, jobPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Destroy the job.
-     *
-     * @param ptr Pointer.
-     */
-    public void computeJobDestroy(long ptr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.computeJobDestroy(envPtr, ptr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Invoke local callback.
-     *
-     * @param cbPtr Callback pointer.
-     * @param memPtr Memory pointer.
-     */
-    public void continuousQueryListenerApply(long cbPtr, long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.continuousQueryListenerApply(envPtr, cbPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Create filter in native platform.
-     *
-     * @param memPtr Memory pointer.
-     * @return Pointer to created filter.
-     */
-    public long continuousQueryFilterCreate(long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.continuousQueryFilterCreate(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Invoke remote filter.
-     *
-     * @param filterPtr Filter pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    public int continuousQueryFilterApply(long filterPtr, long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.continuousQueryFilterApply(envPtr, filterPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Release remote  filter.
-     *
-     * @param filterPtr Filter pointer.
-     */
-    public void continuousQueryFilterRelease(long filterPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.continuousQueryFilterRelease(envPtr, filterPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify native data streamer about topology update.
-     *
-     * @param ptr Data streamer native pointer.
-     * @param topVer Topology version.
-     * @param topSize Topology size.
-     */
-    public void dataStreamerTopologyUpdate(long ptr, long topVer, int topSize) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.dataStreamerTopologyUpdate(envPtr, ptr, topVer, topSize);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Invoke stream receiver.
-     *
-     * @param ptr Receiver native pointer.
-     * @param cache Cache object.
-     * @param memPtr Stream pointer.
-     * @param keepPortable Portable flag.
-     */
-    public void dataStreamerStreamReceiverInvoke(long ptr, Object cache, long memPtr, boolean keepPortable) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.dataStreamerStreamReceiverInvoke(envPtr, ptr, cache, memPtr, keepPortable);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with byte result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureByteResult(long futPtr, int res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureByteResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with boolean result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureBoolResult(long futPtr, int res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureBoolResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with short result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureShortResult(long futPtr, int res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureShortResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with byte result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureCharResult(long futPtr, int res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureCharResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with int result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureIntResult(long futPtr, int res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureIntResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with float result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureFloatResult(long futPtr, float res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureFloatResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with long result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureLongResult(long futPtr, long res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureLongResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with double result.
-     *
-     * @param futPtr Future pointer.
-     * @param res Result.
-     */
-    public void futureDoubleResult(long futPtr, double res) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureDoubleResult(envPtr, futPtr, res);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with object result.
-     *
-     * @param futPtr Future pointer.
-     * @param memPtr Memory pointer.
-     */
-    public void futureObjectResult(long futPtr, long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureObjectResult(envPtr, futPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with null result.
-     *
-     * @param futPtr Future pointer.
-     */
-    public void futureNullResult(long futPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureNullResult(envPtr, futPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Notify future with error.
-     *
-     * @param futPtr Future pointer.
-     * @param memPtr Pointer to memory with error information.
-     */
-    public void futureError(long futPtr, long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.futureError(envPtr, futPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Creates message filter and returns a pointer.
-     *
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    public long messagingFilterCreate(long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.messagingFilterCreate(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param ptr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    public int messagingFilterApply(long ptr, long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.messagingFilterApply(envPtr, ptr, memPtr);
-        }
-        finally {
-            leave();
-        }}
-
-    /**
-     * @param ptr Pointer.
-     */
-    public void messagingFilterDestroy(long ptr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.messagingFilterDestroy(envPtr, ptr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Creates event filter and returns a pointer.
-     *
-     * @param memPtr Memory pointer.
-     * @return Pointer.
-     */
-    public long eventFilterCreate(long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.eventFilterCreate(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param ptr Pointer.
-     * @param memPtr Memory pointer.
-     * @return Result.
-     */
-    public int eventFilterApply(long ptr, long memPtr) {
-        enter();
-
-        try {
-            return PlatformCallbackUtils.eventFilterApply(envPtr, ptr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * @param ptr Pointer.
-     */
-    public void eventFilterDestroy(long ptr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.eventFilterDestroy(envPtr, ptr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Sends node info to native target.
-     *
-     * @param memPtr Ptr to a stream with serialized node.
-     */
-    public void nodeInfo(long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.nodeInfo(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Kernal start callback.
-     *
-     * @param memPtr Memory pointer.
-     */
-    public void onStart(long memPtr) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.onStart(envPtr, memPtr);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Lifecycle event callback.
-     *
-     * @param ptr Holder pointer.
-     * @param evt Event.
-     */
-    public void lifecycleEvent(long ptr, int evt) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.lifecycleEvent(envPtr, ptr, evt);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Re-allocate external memory chunk.
-     *
-     * @param memPtr Cross-platform pointer.
-     * @param cap Capacity.
-     */
-    public void memoryReallocate(long memPtr, int cap) {
-        enter();
-
-        try {
-            PlatformCallbackUtils.memoryReallocate(envPtr, memPtr, cap);
-        }
-        finally {
-            leave();
-        }
-    }
-
-    /**
-     * Initializes native service.
-     *
-     * @param memPtr Pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
-     */
-    public long serviceInit(long memPtr) throws IgniteCheckedException {
-        return PlatformCallbackUtils.serviceInit(envPtr, memPtr);
-    }
-
-    /**
-     * Executes native service.
-     *
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param memPtr Stream pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
-     */
-    public void serviceExecute(long svcPtr, long memPtr) throws IgniteCheckedException {
-        PlatformCallbackUtils.serviceExecute(envPtr, svcPtr, memPtr);
-    }
-
-    /**
-     * Cancels native service.
-     *
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param memPtr Stream pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
-     */
-    public void serviceCancel(long svcPtr, long memPtr) throws IgniteCheckedException {
-        PlatformCallbackUtils.serviceCancel(envPtr, svcPtr, memPtr);
-    }
-
-    /**
-     * Invokes service method.
-     *
-     * @param svcPtr Pointer to the service in the native platform.
-     * @param outMemPtr Output memory pointer.
-     * @param inMemPtr Input memory pointer.
-     * @throws org.apache.ignite.IgniteCheckedException In case of error.
-     */
-    public void serviceInvokeMethod(long svcPtr, long outMemPtr, long inMemPtr) throws IgniteCheckedException {
-        PlatformCallbackUtils.serviceInvokeMethod(envPtr, svcPtr, outMemPtr, inMemPtr);
-    }
-
-    /**
-     * Invokes cluster node filter.
-     *
-     * @param memPtr Stream pointer.
-     */
-    public int clusterNodeFilterApply(long memPtr) {
-        return PlatformCallbackUtils.clusterNodeFilterApply(envPtr, memPtr);
-    }
-
-    /**
-     * Kernal stop callback.
-     */
-    public void onStop() {
-        block();
-
-        PlatformCallbackUtils.onStop(envPtr);
-    }
-
-    /**
-     * Enter gateway.
-     */
-    protected void enter() {
-        if (!lock.tryReadLock())
-            throw new IgniteException("Failed to execute native callback because grid is stopping.");
-    }
-
-    /**
-     * Leave gateway.
-     */
-    protected void leave() {
-        lock.readUnlock();
-    }
-
-    /**
-     * Block gateway.
-     */
-    protected void block() {
-        lock.writeLock();
-    }
-}


[08/27] ignite git commit: IGNITE-1299: Implemented IGFS file unlock with retries.

Posted by sb...@apache.org.
IGNITE-1299: Implemented IGFS file unlock with retries.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fe3e8fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fe3e8fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fe3e8fd

Branch: refs/heads/ignite-1124
Commit: 9fe3e8fd884f2f19bfe3fb39c9cda89c7ae495d8
Parents: e4ba2eb
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Aug 27 12:11:13 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:11:13 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteInternalFuture.java   | 10 ++++
 .../query/GridCacheQueryFutureAdapter.java      |  7 +++
 .../processors/igfs/IgfsMetaManager.java        | 43 +++++++--------
 .../internal/processors/igfs/IgfsUtils.java     | 52 ++++++++++++++++++
 .../util/future/GridFinishedFuture.java         |  5 ++
 .../internal/util/future/GridFutureAdapter.java | 58 +++++++++++++-------
 6 files changed, 131 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 2b7b821..74cfb06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -69,6 +69,16 @@ public interface IgniteInternalFuture<R> {
     public R get(long timeout, TimeUnit unit) throws IgniteCheckedException;
 
     /**
+     * Synchronously waits for completion of the computation and returns computation result ignoring interrupts.
+     *
+     * @return Computation result.
+     * @throws IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if computation
+     *     was cancelled.
+     * @throws IgniteCheckedException If computation failed.
+     */
+    public R getUninterruptibly() throws IgniteCheckedException;
+
+    /**
      * Cancels this future.
      *
      * @return {@code True} if future was canceled (i.e. was not finished prior to this call).

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 53017c9..ed5ad77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -479,6 +479,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         return super.get(timeout, unit);
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
+        if (!isDone())
+            loadAllPages();
+
+        return super.getUninterruptibly();
+    }
 
     /**
      * @param nodeId Sender node id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index b98c5d8..aabe503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -478,49 +478,46 @@ public class IgfsMetaManager extends IgfsManager {
      * @param modificationTime Modification time to write to file info.
      * @throws IgniteCheckedException If failed.
      */
-    public void unlock(IgfsFileInfo info, long modificationTime) throws IgniteCheckedException {
+    public void unlock(final IgfsFileInfo info, final long modificationTime) throws IgniteCheckedException {
         assert validTxState(false);
         assert info != null;
 
         if (busyLock.enterBusy()) {
             try {
-                IgniteUuid lockId = info.lockId();
+                final IgniteUuid lockId = info.lockId();
 
                 if (lockId == null)
                     return;
 
                 // Temporary clear interrupted state for unlocking.
-                boolean interrupted = Thread.interrupted();
-
-                IgniteUuid fileId = info.id();
-
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                final boolean interrupted = Thread.interrupted();
 
                 try {
-                    // Lock file ID for this transaction.
-                    IgfsFileInfo oldInfo = info(fileId);
+                    IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() {
+                        @Override public Void applyx() throws IgniteCheckedException {
+                            IgniteUuid fileId = info.id();
 
-                    if (oldInfo == null)
-                        throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
+                            // Lock file ID for this transaction.
+                            IgfsFileInfo oldInfo = info(fileId);
 
-                    if (!info.lockId().equals(oldInfo.lockId()))
-                        throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
-                            ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
+                            if (oldInfo == null)
+                                throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
 
-                    IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
+                            if (!info.lockId().equals(oldInfo.lockId()))
+                                throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
+                                    ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
 
-                    boolean put = metaCache.put(fileId, newInfo);
+                            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
 
-                    assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+                            boolean put = metaCache.put(fileId, newInfo);
 
-                    tx.commit();
-                }
-                catch (GridClosureException e) {
-                    throw U.cast(e);
+                            assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+
+                            return null;
+                        }
+                    });
                 }
                 finally {
-                    tx.close();
-
                     assert validTxState(false);
 
                     if (interrupted)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 8026a44..7449f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,18 +18,31 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 
+import static org.apache.ignite.IgniteSystemProperties.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
 /**
  * Common IGFS utility methods.
  */
 public class IgfsUtils {
+    /** Maximum number of file unlock transaction retries when topology changes. */
+    private static final int MAX_CACHE_TX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
+
     /**
      * Converts any passed exception to IGFS exception.
      *
@@ -104,4 +117,43 @@ public class IgfsUtils {
 
         return user;
     }
+
+    /**
+     * Performs an operation with transaction with retries.
+     *
+     * @param cache Cache to do the transaction on.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public static <T> T doInTransactionWithRetries(IgniteInternalCache cache, IgniteOutClosureX<T> clo)
+        throws IgniteCheckedException {
+        assert cache != null;
+
+        int attempts = 0;
+
+        while (attempts < MAX_CACHE_TX_RETRIES) {
+            try (Transaction tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.applyx();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (IgniteException | IgniteCheckedException e) {
+                ClusterTopologyException cte = X.cause(e, ClusterTopologyException.class);
+
+                if (cte != null)
+                    ((IgniteFutureImpl)cte.retryReadyFuture()).internalFuture().getUninterruptibly();
+                else
+                    throw U.cast(e);
+            }
+
+            attempts++;
+        }
+
+        throw new IgniteCheckedException("Failed to perform operation since max number of attempts " +
+            "exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']');
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 242e626..2adee90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -126,6 +126,11 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
     }
 
     /** {@inheritDoc} */
+    @Override public T getUninterruptibly() throws IgniteCheckedException {
+        return get();
+    }
+
+    /** {@inheritDoc} */
     @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) {
         assert lsnr != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index f8caf22..91ce549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -107,6 +107,43 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
 
     /** {@inheritDoc} */
     @Override public R get() throws IgniteCheckedException {
+        return get0(ignoreInterrupts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R getUninterruptibly() throws IgniteCheckedException {
+        return get0(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R get(long timeout) throws IgniteCheckedException {
+        // Do not replace with static import, as it may not compile.
+        return get(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+        A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
+        A.notNull(unit, "unit");
+
+        try {
+            return get0(unit.toNanos(timeout));
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
+        }
+    }
+
+    /**
+     * Internal get routine.
+     *
+     * @param ignoreInterrupts Whether to ignore interrupts.
+     * @return Result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
         try {
             if (endTime == 0) {
                 if (ignoreInterrupts)
@@ -132,27 +169,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public R get(long timeout) throws IgniteCheckedException {
-        // Do not replace with static import, as it may not compile.
-        return get(timeout, TimeUnit.MILLISECONDS);
-    }
-
-    /** {@inheritDoc} */
-    @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
-        A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
-        A.notNull(unit, "unit");
-
-        try {
-            return get0(unit.toNanos(timeout));
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
-        }
-    }
-
     /**
      * @param nanosTimeout Timeout (nanoseconds).
      * @return Result.


[04/27] ignite git commit: WIP on cluster node and cluster metrics.

Posted by sb...@apache.org.
WIP on cluster node and cluster metrics.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e4ba2eba
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e4ba2eba
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e4ba2eba

Branch: refs/heads/ignite-1124
Commit: e4ba2ebaf2544ac978c201a6a02e6330e1d5ac81
Parents: 5718480
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 11:28:46 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 11:28:46 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/portable/PortableContext.java    | 3 ---
 .../internal/processors/platform/PlatformContext.java       | 9 +++++++++
 2 files changed, 9 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e4ba2eba/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index 2d3cbf0..a9d64d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -204,15 +204,12 @@ public class PortableContext implements Externalizable {
 //        registerPredefinedType(DrSenderAttributes.class, 65);
 //        registerPredefinedType(DrSenderRemoteAttributes.class, 66);
 //
-//        registerPredefinedType(InteropClusterNode.class, 67);
-//        registerPredefinedType(InteropClusterMetrics.class, 68);
 //        registerPredefinedType(InteropMetadata.class, 70);
 //
 //        registerPredefinedType(InteropDotNetConfiguration.class, 71);
 //        registerPredefinedType(InteropDotNetPortableConfiguration.class, 72);
 //        registerPredefinedType(InteropDotNetPortableTypeConfiguration.class, 73);
 //        registerPredefinedType(InteropIgniteProxy.class, 74);
-//        registerPredefinedType(InteropCacheMetrics.class, 75);
 //        registerPredefinedType(InteropProductLicence.class, 78);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e4ba2eba/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
index fb1eaa2..90ed85d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.portable.*;
 import org.apache.ignite.internal.processors.platform.callback.*;
 import org.apache.ignite.internal.processors.platform.memory.*;
+import org.jetbrains.annotations.*;
 
 import java.util.*;
 
@@ -111,4 +112,12 @@ public interface PlatformContext {
      * @param reader Reader.
      */
     public void processMetadata(PortableRawReaderEx reader);
+
+    /**
+     * Write cluster metrics.
+     *
+     * @param writer Writer.
+     * @param metrics Metrics.
+     */
+    public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics);
 }


[11/27] ignite git commit: Moved platform abstract predicate to Ignite.

Posted by sb...@apache.org.
Moved platform abstract predicate to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/daa87962
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/daa87962
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/daa87962

Branch: refs/heads/ignite-1124
Commit: daa87962306aa86f385c825d047febcfb3635aed
Parents: 6b2ee50
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 12:16:25 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:16:25 2015 +0300

----------------------------------------------------------------------
 .../platform/PlatformAbstractPredicate.java     | 64 ++++++++++++++++++++
 1 file changed, 64 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/daa87962/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java
new file mode 100644
index 0000000..c5197fd
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformAbstractPredicate.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform;
+
+import java.io.*;
+
+/**
+ * Base interop predicate. Delegates apply to native platform.
+ */
+public abstract class PlatformAbstractPredicate implements Externalizable {
+    /** .Net portable predicate */
+    protected Object pred;
+
+    /** Pointer to deployed predicate. */
+    protected transient long ptr;
+
+    /** Interop processor. */
+    protected transient PlatformContext ctx;
+
+    /**
+     * {@link java.io.Externalizable} support.
+     */
+    public PlatformAbstractPredicate() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param pred .Net portable predicate.
+     * @param ptr Pointer to predicate in the native platform.
+     * @param ctx Kernal context.
+     */
+    protected PlatformAbstractPredicate(Object pred, long ptr, PlatformContext ctx) {
+        this.pred = pred;
+        this.ptr = ptr;
+        this.ctx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(pred);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        pred = in.readObject();
+    }
+}


[19/27] ignite git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main

Posted by sb...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/ignite into master-main


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f497e8e2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f497e8e2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f497e8e2

Branch: refs/heads/ignite-1124
Commit: f497e8e2d36b93c9ee4e4b810478b76c0b4ff585
Parents: f575ff1 7c2c02b
Author: Denis Magda <dm...@gridgain.com>
Authored: Thu Aug 27 13:21:46 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Thu Aug 27 13:21:46 2015 +0300

----------------------------------------------------------------------
 .../processors/platform/PlatformContext.java         | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------