You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/30 14:09:58 UTC

[1/8] ignite git commit: IGNITE-2902: CPP: IgniteError is now extends std::exception. This closes #582.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1786 607609d5e -> 43bc846d0


IGNITE-2902: CPP: IgniteError is now extends std::exception. This closes #582.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7f9ee2d5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7f9ee2d5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7f9ee2d5

Branch: refs/heads/ignite-1786
Commit: 7f9ee2d599b8a531f6b7046a418fd7f947abf8d0
Parents: 732abda
Author: isapego <is...@gridgain.com>
Authored: Tue Mar 29 13:58:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Mar 29 13:58:36 2016 +0300

----------------------------------------------------------------------
 modules/platforms/cpp/core-test/Makefile.am     |  1 +
 .../cpp/core-test/project/vs/core-test.vcxproj  |  1 +
 .../project/vs/core-test.vcxproj.filters        |  3 ++
 .../cpp/core-test/src/ignite_error_test.cpp     | 45 ++++++++++++++++++++
 .../cpp/core/include/ignite/ignite_error.h      | 15 +++++--
 modules/platforms/cpp/core/src/ignite_error.cpp |  7 ++-
 6 files changed, 68 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7f9ee2d5/modules/platforms/cpp/core-test/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/Makefile.am b/modules/platforms/cpp/core-test/Makefile.am
index 531fee0..55b3b98 100644
--- a/modules/platforms/cpp/core-test/Makefile.am
+++ b/modules/platforms/cpp/core-test/Makefile.am
@@ -31,6 +31,7 @@ ignite_tests_SOURCES = src/cache_test.cpp \
                          src/ignition_test.cpp \
                          src/interop_memory_test.cpp \
                          src/handle_registry_test.cpp \
+                         src/ignite_error_test.cpp \
                          src/binary_test_defs.cpp \
                          src/binary_reader_writer_raw_test.cpp \
                          src/binary_reader_writer_test.cpp \

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f9ee2d5/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
index d98d202..9e3d816 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
@@ -33,6 +33,7 @@
   <ItemGroup>
     <ClCompile Include="..\..\src\cache_test.cpp" />
     <ClCompile Include="..\..\src\concurrent_test.cpp" />
+    <ClCompile Include="..\..\src\ignite_error_test.cpp" />
     <ClCompile Include="..\..\src\ignition_test.cpp" />
     <ClCompile Include="..\..\src\handle_registry_test.cpp" />
     <ClCompile Include="..\..\src\binary_reader_writer_raw_test.cpp" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f9ee2d5/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
index 15b9c40..ec7bab5 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
@@ -37,6 +37,9 @@
     <ClCompile Include="..\..\src\interop_memory_test.cpp">
       <Filter>Code</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\ignite_error_test.cpp">
+      <Filter>Code</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\teamcity_messages.h">

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f9ee2d5/modules/platforms/cpp/core-test/src/ignite_error_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/ignite_error_test.cpp b/modules/platforms/cpp/core-test/src/ignite_error_test.cpp
new file mode 100644
index 0000000..c9e3043
--- /dev/null
+++ b/modules/platforms/cpp/core-test/src/ignite_error_test.cpp
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _MSC_VER
+    #define BOOST_TEST_DYN_LINK
+#endif
+
+#include <boost/test/unit_test.hpp>
+
+#include "ignite/ignite_error.h"
+
+using namespace ignite;
+using namespace boost::unit_test;
+
+BOOST_AUTO_TEST_SUITE(IgniteErrorTestSuite)
+
+BOOST_AUTO_TEST_CASE(TestIgniteErrorDerivesStdException)
+{
+    const std::string testMsg = "Lorem ipsum dolor sit amet";
+
+    try
+    {
+        throw IgniteError(IgniteError::IGNITE_ERR_GENERIC, testMsg.c_str());
+    }
+    catch (std::exception& e)
+    {
+        BOOST_REQUIRE_EQUAL(testMsg, std::string(e.what()));
+    }
+}
+
+BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f9ee2d5/modules/platforms/cpp/core/include/ignite/ignite_error.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/ignite_error.h b/modules/platforms/cpp/core/include/ignite/ignite_error.h
index 3b192b1..4097a62 100644
--- a/modules/platforms/cpp/core/include/ignite/ignite_error.h
+++ b/modules/platforms/cpp/core/include/ignite/ignite_error.h
@@ -23,9 +23,11 @@
 #ifndef _IGNITE_ERROR
 #define _IGNITE_ERROR
 
-#include <sstream>
 #include <stdint.h>
 
+#include <exception>
+#include <sstream>
+
 #include <ignite/common/common.h>
 
 #define IGNITE_ERROR_1(code, part1) { \
@@ -75,7 +77,7 @@ namespace ignite
     /**
      * Ignite error information.
      */
-    class IGNITE_IMPORT_EXPORT IgniteError
+    class IGNITE_IMPORT_EXPORT IgniteError : public std::exception
     {
     public:
         /** Success. */
@@ -243,7 +245,14 @@ namespace ignite
          * @return Error message.
          */
         const char* GetText() const;
-        
+
+        /**
+         * Get error message. Synonim for GetText().
+         *
+         * @return Error message.
+         */
+        virtual const char* what() const;
+
         /**
          * Set error.
          *

http://git-wip-us.apache.org/repos/asf/ignite/blob/7f9ee2d5/modules/platforms/cpp/core/src/ignite_error.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/src/ignite_error.cpp b/modules/platforms/cpp/core/src/ignite_error.cpp
index 1545631..ad30e46 100644
--- a/modules/platforms/cpp/core/src/ignite_error.cpp
+++ b/modules/platforms/cpp/core/src/ignite_error.cpp
@@ -90,7 +90,12 @@ namespace ignite
         else
             return  "No additional information available.";
     }
-    
+
+    const char* IgniteError::what() const
+    {
+        return GetText();
+    }
+
     void IgniteError::SetError(const int jniCode, const char* jniCls, const char* jniMsg, IgniteError* err)
     {
         if (jniCode == IGNITE_JNI_ERR_SUCCESS)


[8/8] ignite git commit: Merge branch 'master' into ignite-1786

Posted by vo...@apache.org.
Merge branch 'master' into ignite-1786


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/43bc846d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/43bc846d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/43bc846d

Branch: refs/heads/ignite-1786
Commit: 43bc846d0cede479b897bafc197dd592731ad259
Parents: 607609d bbe5258
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 30 15:09:52 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 30 15:09:52 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheInvokeEntry.java      |  41 ++-
 .../processors/cache/CacheLazyEntry.java        |   9 +-
 .../processors/cache/GridCacheMapEntry.java     |   9 +-
 .../GridCachePartitionExchangeManager.java      |   8 +-
 .../processors/cache/GridCacheSwapManager.java  |  26 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |  12 +-
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +-
 .../local/atomic/GridLocalAtomicCache.java      |   4 +-
 .../cache/transactions/IgniteTxAdapter.java     |   8 +-
 .../cache/transactions/IgniteTxEntry.java       |   4 +-
 .../transactions/IgniteTxLocalAdapter.java      |   5 +-
 .../processors/plugin/CachePluginManager.java   |  25 ++
 .../ignite/internal/visor/cache/VisorCache.java |   9 +
 .../internal/visor/cache/VisorCacheMetrics.java |  18 +
 .../internal/visor/cache/VisorCacheV3.java      | 108 ++++++
 .../visor/node/VisorNodeDataCollectorJob.java   |  31 +-
 .../ignite/plugin/CachePluginProvider.java      |  11 +
 ...cingDelayedPartitionMapExchangeSelfTest.java |  14 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 186 ++++++++++-
 .../junits/common/GridCommonAbstractTest.java   |  53 ++-
 ...CacheDeploymentCachePluginConfiguration.java |   7 +
 modules/kafka/README.txt                        |  81 ++++-
 .../kafka/connect/IgniteSourceConnector.java    |  81 +++++
 .../kafka/connect/IgniteSourceConstants.java    |  44 +++
 .../stream/kafka/connect/IgniteSourceTask.java  | 328 +++++++++++++++++++
 .../serialization/CacheEventConverter.java      |  66 ++++
 .../serialization/CacheEventDeserializer.java   |  54 +++
 .../serialization/CacheEventSerializer.java     |  54 +++
 .../kafka/IgniteKafkaStreamerSelfTestSuite.java |   4 +-
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  11 +-
 .../ignite/stream/kafka/TestKafkaBroker.java    |  27 +-
 .../kafka/connect/IgniteSinkConnectorTest.java  |  13 +-
 .../connect/IgniteSourceConnectorMock.java      |  31 ++
 .../connect/IgniteSourceConnectorTest.java      | 327 ++++++++++++++++++
 .../kafka/connect/IgniteSourceTaskMock.java     |  31 ++
 .../kafka/connect/TestCacheEventFilter.java     |  31 ++
 .../kafka/src/test/resources/example-ignite.xml |   4 +-
 .../common/include/ignite/common/ignite_error.h |  15 +-
 .../platforms/cpp/common/src/ignite_error.cpp   |   7 +-
 modules/platforms/cpp/core-test/Makefile.am     |   1 +
 .../cpp/core-test/project/vs/core-test.vcxproj  |   1 +
 .../project/vs/core-test.vcxproj.filters        |   3 +
 .../cpp/core-test/src/ignite_error_test.cpp     |  45 +++
 .../Examples/ProjectFilesTest.cs                |   2 +-
 44 files changed, 1748 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/43bc846d/modules/platforms/cpp/common/include/ignite/common/ignite_error.h
----------------------------------------------------------------------
diff --cc modules/platforms/cpp/common/include/ignite/common/ignite_error.h
index 3b192b1,0000000..4097a62
mode 100644,000000..100644
--- a/modules/platforms/cpp/common/include/ignite/common/ignite_error.h
+++ b/modules/platforms/cpp/common/include/ignite/common/ignite_error.h
@@@ -1,265 -1,0 +1,274 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +/**
 + * @file
 + * Declares ignite::IgniteError class.
 + */
 +
 +#ifndef _IGNITE_ERROR
 +#define _IGNITE_ERROR
 +
- #include <sstream>
 +#include <stdint.h>
 +
++#include <exception>
++#include <sstream>
++
 +#include <ignite/common/common.h>
 +
 +#define IGNITE_ERROR_1(code, part1) { \
 +    std::stringstream stream; \
 +    stream << (part1); \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +#define IGNITE_ERROR_2(code, part1, part2) { \
 +    std::stringstream stream; \
 +    stream << (part1) << (part2); \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +#define IGNITE_ERROR_3(code, part1, part2, part3) { \
 +    std::stringstream stream; \
 +    stream << (part1) << (part2) << (part3); \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +#define IGNITE_ERROR_FORMATTED_1(code, msg, key1, val1) { \
 +    std::stringstream stream; \
 +    stream << msg << " [" << key1 << "=" << (val1) << "]"; \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +#define IGNITE_ERROR_FORMATTED_2(code, msg, key1, val1, key2, val2) { \
 +    std::stringstream stream; \
 +    stream << msg << " [" << key1 << "=" << (val1) << ", " << key2 << "=" << (val2) << "]"; \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +#define IGNITE_ERROR_FORMATTED_3(code, msg, key1, val1, key2, val2, key3, val3) { \
 +    std::stringstream stream; \
 +    stream << msg << " [" << key1 << "=" << (val1) << ", " << key2 << "=" << (val2) << ", " << key3 << "=" << (val3) << "]"; \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +#define IGNITE_ERROR_FORMATTED_4(code, msg, key1, val1, key2, val2, key3, val3, key4, val4) { \
 +    std::stringstream stream; \
 +    stream << msg << " [" << key1 << "=" << (val1) << ", " << key2 << "=" << (val2) << ", " << key3 << "=" << (val3) << ", " << key4 << "=" << (val4) << "]"; \
 +    throw ignite::IgniteError(code, stream.str().c_str()); \
 +}
 +
 +namespace ignite
 +{
 +    /**
 +     * Ignite error information.
 +     */
-     class IGNITE_IMPORT_EXPORT IgniteError
++    class IGNITE_IMPORT_EXPORT IgniteError : public std::exception
 +    {
 +    public:
 +        /** Success. */
 +        static const int IGNITE_SUCCESS = 0;
 +
 +        /** Failed to initialize JVM. */
 +        static const int IGNITE_ERR_JVM_INIT = 1;
 +
 +        /** Failed to attach to JVM. */
 +        static const int IGNITE_ERR_JVM_ATTACH = 2;
 +        
 +        /** JVM library is not found. */
 +        static const int IGNITE_ERR_JVM_LIB_NOT_FOUND = 3;
 +
 +        /** Failed to load JVM library. */
 +        static const int IGNITE_ERR_JVM_LIB_LOAD_FAILED = 4;
 +        
 +        /** JVM classpath is not provided. */
 +        static const int IGNITE_ERR_JVM_NO_CLASSPATH = 5;
 +
 +        /** JVM error: no class definition found. */
 +        static const int IGNITE_ERR_JVM_NO_CLASS_DEF_FOUND = 6;
 +
 +        /** JVM error: no such method. */
 +        static const int IGNITE_ERR_JVM_NO_SUCH_METHOD = 7;
 +
 +        /** Memory operation error. */
 +        static const int IGNITE_ERR_MEMORY = 1001;
 +
 +        /** Binary error. */
 +        static const int IGNITE_ERR_BINARY = 1002;
 +
 +        /** Generic Ignite error. */
 +        static const int IGNITE_ERR_GENERIC = 2000;
 +
 +        /** Illegal argument passed. */
 +        static const int IGNITE_ERR_ILLEGAL_ARGUMENT = 2001;
 +
 +        /** Illegal state. */
 +        static const int IGNITE_ERR_ILLEGAL_STATE = 2002;
 +
 +        /** Unsupported operation. */
 +        static const int IGNITE_ERR_UNSUPPORTED_OPERATION = 2003;
 +
 +        /** Thread has been interrup. */
 +        static const int IGNITE_ERR_INTERRUPTED = 2004;
 +
 +        /** Cluster group is empty. */
 +        static const int IGNITE_ERR_CLUSTER_GROUP_EMPTY = 2005;
 +
 +        /** Cluster topology problem. */
 +        static const int IGNITE_ERR_CLUSTER_TOPOLOGY = 2006;
 +
 +        /** Compute execution rejected. */
 +        static const int IGNITE_ERR_COMPUTE_EXECUTION_REJECTED = 2007;
 +
 +        /** Compute job failover. */
 +        static const int IGNITE_ERR_COMPUTE_JOB_FAILOVER = 2008;
 +
 +        /** Compute task cancelled. */
 +        static const int IGNITE_ERR_COMPUTE_TASK_CANCELLED = 2009;
 +
 +        /** Compute task timeout. */
 +        static const int IGNITE_ERR_COMPUTE_TASK_TIMEOUT = 2010;
 +
 +        /** Compute user undeclared exception. */
 +        static const int IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION = 2011;
 +
 +        /** Generic cache error. */
 +        static const int IGNITE_ERR_CACHE = 2012;
 +
 +        /** Generic cache loader error. */
 +        static const int IGNITE_ERR_CACHE_LOADER = 2013;
 +
 +        /** Generic cache writer error. */
 +        static const int IGNITE_ERR_CACHE_WRITER = 2014;
 +        
 +        /** Generic cache entry processor error. */
 +        static const int IGNITE_ERR_ENTRY_PROCESSOR = 2015;
 +
 +        /** Cache atomic update timeout. */
 +        static const int IGNITE_ERR_CACHE_ATOMIC_UPDATE_TIMEOUT = 2016;
 +
 +        /** Cache partial update. */
 +        static const int IGNITE_ERR_CACHE_PARTIAL_UPDATE = 2017;
 +        
 +        /** Transaction optimisitc exception. */
 +        static const int IGNITE_ERR_TX_OPTIMISTIC = 2018;
 +
 +        /** Transaction timeout. */
 +        static const int IGNITE_ERR_TX_TIMEOUT = 2019;
 +
 +        /** Transaction rollback. */
 +        static const int IGNITE_ERR_TX_ROLLBACK = 2020;
 +
 +        /** Transaction heuristic exception. */
 +        static const int IGNITE_ERR_TX_HEURISTIC = 2021;
 +
 +        /** Authentication error. */
 +        static const int IGNITE_ERR_AUTHENTICATION = 2022;
 +
 +        /** Security error. */
 +        static const int IGNITE_ERR_SECURITY = 2023;
 +        
 +        /** Unknown error. */
 +        static const int IGNITE_ERR_UNKNOWN = -1;
 +
 +        /**
 +         * Throw an error if code is not IGNITE_SUCCESS.
 +         *
 +         * @param err Error.
 +         */
 +        static void ThrowIfNeeded(IgniteError& err);
 +
 +        /**
 +         * Create empty error.
 +         */
 +        IgniteError();
 +
 +        /**
 +         * Create error with specific code.
 +         *
 +         * @param code Error code.
 +         */
 +        IgniteError(const int32_t code);
 +
 +        /**
 +         * Create error with specific code and message.
 +         *
 +         * @param code Error code.
 +         * @param msg Message.
 +         */
 +        IgniteError(const int32_t code, const char* msg);
 +        
 +        /**
 +         * Copy constructor.
 +         *
 +         * @param other Other instance.
 +         */
 +        IgniteError(const IgniteError& other);
 +
 +        /**
 +         * Assignment operator.
 +         *
 +         * @param other Other instance.
 +         * @return Assignment result.
 +         */
 +        IgniteError& operator=(const IgniteError& other);
 +
 +        /**
 +         * Destructor.
 +         */
 +        ~IgniteError();
 +
 +        /**
 +         * Get error code.
 +         *
 +         * @return Error code.
 +         */
 +        int32_t GetCode() const;
 +
 +        /**
 +         * Get error message.
 +         *
 +         * @return Error message.
 +         */
 +        const char* GetText() const;
-         
++
++        /**
++         * Get error message. Synonim for GetText().
++         *
++         * @return Error message.
++         */
++        virtual const char* what() const;
++
 +        /**
 +         * Set error.
 +         *
 +         * @param jniCode Error code.
 +         * @param jniCls Error class.
 +         * @param jniMsg Error message.
 +         * @param err Error.
 +         */
 +        static void SetError(const int jniCode, const char* jniCls, const char* jniMsg, IgniteError* err);
 +    private:
 +        /** Error code. */
 +        int32_t code;    
 +        
 +        /** Error message. */
 +        char* msg;       
 +    };    
 +}
 +
 +#endif

http://git-wip-us.apache.org/repos/asf/ignite/blob/43bc846d/modules/platforms/cpp/common/src/ignite_error.cpp
----------------------------------------------------------------------
diff --cc modules/platforms/cpp/common/src/ignite_error.cpp
index cd1b9c4,0000000..6981a86
mode 100644,000000..100644
--- a/modules/platforms/cpp/common/src/ignite_error.cpp
+++ b/modules/platforms/cpp/common/src/ignite_error.cpp
@@@ -1,221 -1,0 +1,226 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +#include <ignite/common/java.h>
 +#include <ignite/common/ignite_error.h>
 +#include <ignite/common/utils.h>
 +
 +using namespace ignite::common::java;
 +using namespace ignite::common::utils;
 +
 +namespace ignite
 +{
 +    void IgniteError::ThrowIfNeeded(IgniteError& err)
 +    {
 +        if (err.code != IGNITE_SUCCESS)
 +            throw err;
 +    }
 +
 +    IgniteError::IgniteError() : code(IGNITE_SUCCESS), msg(NULL)
 +    {
 +        // No-op.
 +    }
 +
 +    IgniteError::IgniteError(int32_t code) : code(code), msg(NULL)
 +    {
 +        // No-op.
 +    }
 +
 +    IgniteError::IgniteError(int32_t code, const char* msg)
 +    {
 +        this->code = code;
 +        this->msg = CopyChars(msg);
 +    }
 +
 +    IgniteError::IgniteError(const IgniteError& other)
 +    {
 +        this->code = other.code;
 +        this->msg = CopyChars(other.msg);
 +    }
 +
 +    IgniteError& IgniteError::operator=(const IgniteError& other)
 +    {
 +        if (this != &other)
 +        {
 +            IgniteError tmp(other);
 +
 +            int tmpCode = code;
 +            char* tmpMsg = msg;
 +            
 +            code = tmp.code;
 +            msg = tmp.msg;
 +
 +            tmp.code = tmpCode;
 +            tmp.msg = tmpMsg;
 +        }
 +
 +        return *this;
 +    }
 +
 +    IgniteError::~IgniteError()
 +    {
 +        ReleaseChars(msg);
 +    }
 +
 +    int32_t IgniteError::GetCode() const
 +    {
 +        return code;
 +    }
 +
 +    const char* IgniteError::GetText() const
 +    {
 +        if (code == IGNITE_SUCCESS)
 +            return "Operation completed successfully.";
 +        else if (msg)
 +            return msg;
 +        else
 +            return  "No additional information available.";
 +    }
-     
++
++    const char* IgniteError::what() const
++    {
++        return GetText();
++    }
++
 +    void IgniteError::SetError(const int jniCode, const char* jniCls, const char* jniMsg, IgniteError* err)
 +    {
 +        if (jniCode == IGNITE_JNI_ERR_SUCCESS)
 +            *err = IgniteError();
 +        else if (jniCode == IGNITE_JNI_ERR_GENERIC)
 +        {
 +            // The most common case when we have Java exception "in hands" and must map it to respective code.
 +            if (jniCls)
 +            {
 +                std::string jniCls0 = jniCls;
 +
 +                if (jniCls0.compare("java.lang.NoClassDefFoundError") == 0)
 +                {
 +                    std::stringstream stream; 
 +
 +                    stream << "Java class is not found (did you set IGNITE_HOME environment variable?)";
 +
 +                    if (jniMsg)
 +                        stream << ": " << jniMsg;
 +                    
 +                    *err = IgniteError(IGNITE_ERR_JVM_NO_CLASS_DEF_FOUND, stream.str().c_str());
 +                }
 +                else if (jniCls0.compare("java.lang.NoSuchMethodError") == 0)
 +                {
 +                    std::stringstream stream;
 +
 +                    stream << "Java method is not found (did you set IGNITE_HOME environment variable?)";
 +
 +                    if (jniMsg)
 +                        stream << ": " << jniMsg;
 +
 +                    *err = IgniteError(IGNITE_ERR_JVM_NO_SUCH_METHOD, stream.str().c_str());
 +                }
 +                else if (jniCls0.compare("java.lang.IllegalArgumentException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_ILLEGAL_ARGUMENT, jniMsg);
 +                else if (jniCls0.compare("java.lang.IllegalStateException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_ILLEGAL_STATE, jniMsg);
 +                else if (jniCls0.compare("java.lang.UnsupportedOperationException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_UNSUPPORTED_OPERATION, jniMsg);
 +                else if (jniCls0.compare("java.lang.InterruptedException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_INTERRUPTED, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.cluster.ClusterGroupEmptyException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CLUSTER_GROUP_EMPTY, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.cluster.ClusterTopologyException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CLUSTER_TOPOLOGY, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.compute.ComputeExecutionRejectedException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_COMPUTE_EXECUTION_REJECTED, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.compute.ComputeJobFailoverException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_COMPUTE_JOB_FAILOVER, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.compute.ComputeTaskCancelledException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_COMPUTE_TASK_CANCELLED, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.compute.ComputeTaskTimeoutException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_COMPUTE_TASK_TIMEOUT, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.compute.ComputeUserUndeclaredException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_COMPUTE_USER_UNDECLARED_EXCEPTION, jniMsg);
 +                else if (jniCls0.compare("javax.cache.CacheException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CACHE, jniMsg);
 +                else if (jniCls0.compare("javax.cache.integration.CacheLoaderException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CACHE_LOADER, jniMsg);
 +                else if (jniCls0.compare("javax.cache.integration.CacheWriterException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CACHE_WRITER, jniMsg);
 +                else if (jniCls0.compare("javax.cache.processor.EntryProcessorException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_ENTRY_PROCESSOR, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.cache.CacheAtomicUpdateTimeoutException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CACHE_ATOMIC_UPDATE_TIMEOUT, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.cache.CachePartialUpdateException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_CACHE_PARTIAL_UPDATE, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.transactions.TransactionOptimisticException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_TX_OPTIMISTIC, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.transactions.TransactionTimeoutException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_TX_TIMEOUT, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.transactions.TransactionRollbackException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_TX_ROLLBACK, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.transactions.TransactionHeuristicException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_TX_HEURISTIC, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.IgniteAuthenticationException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_AUTHENTICATION, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.plugin.security.GridSecurityException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_SECURITY, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.IgniteException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_GENERIC, jniMsg);
 +                else if (jniCls0.compare("org.apache.ignite.IgniteCheckedException") == 0)
 +                    *err = IgniteError(IGNITE_ERR_GENERIC, jniMsg);
 +                else
 +                {
 +                    std::stringstream stream;
 +                    
 +                    stream << "Java exception occurred [cls=" << jniCls0;
 +
 +                    if (jniMsg)
 +                        stream << ", msg=" << jniMsg;
 +
 +                    stream << "]";
 +
 +                    *err = IgniteError(IGNITE_ERR_UNKNOWN, stream.str().c_str());
 +                }                    
 +            }
 +            else
 +            {
 +                // JNI class name is not available. Something really weird.
 +                *err = IgniteError(IGNITE_ERR_UNKNOWN);
 +            }
 +        }
 +        else if (jniCode == IGNITE_JNI_ERR_JVM_INIT)
 +        {
 +            std::stringstream stream;
 +
 +            stream << "Failed to initialize JVM [errCls=";
 +
 +            if (jniCls)
 +                stream << jniCls;
 +            else
 +                stream << "N/A";
 +
 +            stream << ", errMsg=";
 +
 +            if (jniMsg)
 +                stream << jniMsg;
 +            else
 +                stream << "N/A";
 +
 +            stream << "]";
 +
 +            *err = IgniteError(IGNITE_ERR_JVM_INIT, stream.str().c_str());
 +        }
 +        else if (jniCode == IGNITE_JNI_ERR_JVM_ATTACH)
 +            *err = IgniteError(IGNITE_ERR_JVM_ATTACH, "Failed to attach to JVM.");
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/43bc846d/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj
----------------------------------------------------------------------


[2/8] ignite git commit: IGNITE-2801 Coordinator floods network with partitions full map exchange messages

Posted by vo...@apache.org.
IGNITE-2801 Coordinator floods network with partitions full map exchange messages


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a3d7248
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a3d7248
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a3d7248

Branch: refs/heads/ignite-1786
Commit: 6a3d724805e231edca2d8d72891f15a8a729bbc2
Parents: 7f9ee2d
Author: Anton Vinogradov <av...@apache.org>
Authored: Tue Mar 29 14:56:21 2016 +0300
Committer: Anton Vinogradov <av...@apache.org>
Committed: Tue Mar 29 14:58:14 2016 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      |   8 +-
 ...cingDelayedPartitionMapExchangeSelfTest.java |  14 +-
 .../GridCacheRebalancingSyncSelfTest.java       | 186 ++++++++++++++++++-
 .../junits/common/GridCommonAbstractTest.java   |  53 +++++-
 4 files changed, 237 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 54580fd..6de10c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1264,13 +1264,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             break;
                     }
 
-                    // If not first preloading and no more topology events present,
-                    // then we periodically refresh partition map.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished) {
-                        refreshPartitions(timeout);
-
+                    // If not first preloading and no more topology events present.
+                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
                         timeout = cctx.gridConfig().getNetworkTimeout();
-                    }
 
                     // After workers line up and before preloading starts we initialize all futures.
                     if (log.isDebugEnabled())

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
index 2890fcb..2c47a1c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRabalancingDelayedPartitionMapExchangeSelfTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -99,7 +100,7 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
      * @throws Exception e.
      */
     public void test() throws Exception {
-        startGrid(0);
+        IgniteKernal ignite = (IgniteKernal)startGrid(0);
 
         CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
 
@@ -114,26 +115,29 @@ public class GridCacheRabalancingDelayedPartitionMapExchangeSelfTest extends Gri
         startGrid(2);
         startGrid(3);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         for (int i = 0; i < 2; i++) {
             stopGrid(3);
 
-            awaitPartitionMapExchange(true);
+            awaitPartitionMapExchange(true, true);
 
             startGrid(3);
 
-            awaitPartitionMapExchange(true);
+            awaitPartitionMapExchange(true, true);
         }
 
         startGrid(4);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         assert rs.isEmpty();
 
         record = true;
 
+        // Emulate latest GridDhtPartitionsFullMessages.
+        ignite.context().cache().context().exchange().scheduleResendPartitions();
+
         while (rs.size() < 3) { // N - 1 nodes.
             U.sleep(10);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
index e4ad66b..f1e5687 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/GridCacheRebalancingSyncSelfTest.java
@@ -17,20 +17,40 @@
 
 package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -38,6 +58,9 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import static org.apache.ignite.cache.CacheMode.LOCAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+
 /**
  *
  */
@@ -69,6 +92,12 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
     /** */
     private volatile boolean concurrentStartFinished3;
 
+    /** */
+    private volatile boolean record = false;
+
+    /** */
+    private final ConcurrentHashMap<Class, AtomicInteger> map = new ConcurrentHashMap<>();
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration iCfg = super.getConfiguration(gridName);
@@ -76,6 +105,13 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setIpFinder(ipFinder);
         ((TcpDiscoverySpi)iCfg.getDiscoverySpi()).setForceServerMode(true);
 
+        TcpCommunicationSpi commSpi = new CountingCommunicationSpi();
+
+        commSpi.setLocalPort(GridTestUtils.getNextCommPort(getClass()));
+        commSpi.setTcpNoDelay(true);
+
+        iCfg.setCommunicationSpi(commSpi);
+
         if (getTestGridName(10).equals(gridName))
             iCfg.setClientMode(true);
 
@@ -173,8 +209,9 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                 log.info("<" + name + "> Checked " + (i + 1) * 100 / (TEST_SIZE) + "% entries. [count=" + TEST_SIZE +
                     ", iteration=" + iter + ", cache=" + name + "]");
 
-            assert ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter) :
-                i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")";
+            assertTrue(i + " value " + (i + name.hashCode() + iter) + " does not match (" + ignite.cache(name).get(i) + ")",
+                ignite.cache(name).get(i) != null && ignite.cache(name).get(i).equals(i + name.hashCode() + iter));
+
         }
     }
 
@@ -189,7 +226,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testSimpleRebalancing() throws Exception {
-        Ignite ignite = startGrid(0);
+        IgniteKernal ignite = (IgniteKernal)startGrid(0);
 
         generateData(ignite, 0, 0);
 
@@ -202,19 +239,43 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(0, 2);
         waitForRebalancing(1, 2);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         stopGrid(0);
 
         waitForRebalancing(1, 3);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         startGrid(2);
 
         waitForRebalancing(1, 4);
         waitForRebalancing(2, 4);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         stopGrid(2);
 
         waitForRebalancing(1, 5);
 
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
+
         long spend = (System.currentTimeMillis() - start) / 1000;
 
         checkData(grid(1), 0, 0);
@@ -277,7 +338,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         concurrentStartFinished = true;
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -348,12 +409,79 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
                 Map map = U.field(supplier, "scMap");
 
                 synchronized (map) {
-                    assert map.isEmpty();
+                    assertTrue(map.isEmpty());
                 }
             }
         }
     }
 
+    protected void checkPartitionMapExchangeFinished() {
+        for (Ignite g : G.allGrids()) {
+            IgniteKernal g0 = (IgniteKernal)g;
+
+            for (IgniteCacheProxy<?, ?> c : g0.context().cache().jcaches()) {
+                CacheConfiguration cfg = c.context().config();
+
+                if (cfg.getCacheMode() != LOCAL && cfg.getRebalanceMode() != NONE) {
+                    GridDhtCacheAdapter<?, ?> dht = dht(c);
+
+                    GridDhtPartitionTopology top = dht.topology();
+
+                    List<GridDhtLocalPartition> locs = top.localPartitions();
+
+                    for (GridDhtLocalPartition loc : locs) {
+                        assertTrue("Wrong partition state, should be OWNING [state=" + loc.state() + "]",
+                            loc.state() == GridDhtPartitionState.OWNING);
+
+                        Collection<ClusterNode> affNodes =
+                            g0.affinity(cfg.getName()).mapPartitionToPrimaryAndBackups(loc.id());
+
+                        assertTrue(affNodes.contains(g0.localNode()));
+                    }
+
+                    for (Ignite remote : G.allGrids()) {
+                        IgniteKernal remote0 = (IgniteKernal)remote;
+
+                        IgniteCacheProxy<?, ?> remoteC = remote0.context().cache().jcache(cfg.getName());
+
+                        GridDhtCacheAdapter<?, ?> remoteDht = dht(remoteC);
+
+                        GridDhtPartitionTopology remoteTop = remoteDht.topology();
+
+                        GridDhtPartitionMap2 pMap = remoteTop.partitionMap(true).get(((IgniteKernal)g).getLocalNodeId());
+
+                        assertEquals(pMap.size(), locs.size());
+
+                        for (Map.Entry entry : pMap.entrySet()) {
+                            assertTrue("Wrong partition state, should be OWNING [state=" + entry.getValue() + "]",
+                                entry.getValue() == GridDhtPartitionState.OWNING);
+                        }
+
+                        for (GridDhtLocalPartition loc : locs) {
+                            assertTrue(pMap.containsKey(loc.id()));
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    protected void checkPartitionMapMessagesAbsent() throws IgniteInterruptedCheckedException {
+        map.clear();
+
+        record = true;
+
+        U.sleep(30_000);
+
+        record = false;
+
+        AtomicInteger iF = map.get(GridDhtPartitionsFullMessage.class);
+        AtomicInteger iS = map.get(GridDhtPartitionsSingleMessage.class);
+
+        assertTrue(iF == null || iF.get() == 1); // 1 message can be sent right after all checks passed.
+        assertTrue(iS == null);
+    }
+
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 10 * 60_000;
@@ -446,7 +574,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 5, 1);
         waitForRebalancing(4, 5, 1);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -470,7 +598,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 6);
         waitForRebalancing(4, 6);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -480,7 +608,7 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 7);
         waitForRebalancing(4, 7);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
 
         checkSupplyContextMapIsEmpty();
 
@@ -489,7 +617,11 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
         waitForRebalancing(3, 8);
         waitForRebalancing(4, 8);
 
-        awaitPartitionMapExchange(true);
+        awaitPartitionMapExchange(true, true);
+
+        checkPartitionMapExchangeFinished();
+
+        checkPartitionMapMessagesAbsent();
 
         checkSupplyContextMapIsEmpty();
 
@@ -514,4 +646,40 @@ public class GridCacheRebalancingSyncSelfTest extends GridCommonAbstractTest {
 
         stopAllGrids();
     }
+
+    /**
+     *
+     */
+    private class CountingCommunicationSpi extends TcpCommunicationSpi {
+        /** {@inheritDoc} */
+        @Override public void sendMessage(final ClusterNode node, final Message msg,
+            final IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            final Object msg0 = ((GridIoMessage)msg).message();
+
+            recordMessage(msg0);
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /**
+         * @param msg
+         */
+        private void recordMessage(Object msg) {
+            if (record) {
+                Class id = msg.getClass();
+
+                AtomicInteger ai = map.get(id);
+
+                if (ai == null) {
+                    ai = new AtomicInteger();
+
+                    AtomicInteger oldAi = map.putIfAbsent(id, ai);
+
+                    (oldAi != null ? oldAi : ai).incrementAndGet();
+                }
+                else
+                    ai.incrementAndGet();
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6a3d7248/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 4fcc1ed..e53ec56 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -68,6 +69,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedCache;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap2;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -416,15 +418,18 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
      */
     @SuppressWarnings("BusyWait")
     protected void awaitPartitionMapExchange() throws InterruptedException {
-        awaitPartitionMapExchange(false);
+        awaitPartitionMapExchange(false, false);
     }
 
     /**
      * @param waitEvicts If {@code true} will wait for evictions finished.
+     * @param waitNode2PartUpdate If {@code true} will wait for nodes node2part info update finished.
      * @throws InterruptedException If interrupted.
      */
     @SuppressWarnings("BusyWait")
-    protected void awaitPartitionMapExchange(boolean waitEvicts) throws InterruptedException {
+    protected void awaitPartitionMapExchange(boolean waitEvicts, boolean waitNode2PartUpdate) throws InterruptedException {
+        long timeout = 30_000;
+
         for (Ignite g : G.allGrids()) {
             IgniteKernal g0 = (IgniteKernal)g;
 
@@ -468,7 +473,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                 GridDhtLocalPartition loc = top.localPartition(p, readyVer, false);
 
                                 if (affNodes.size() != owners.size() || !affNodes.containsAll(owners) ||
-                                    (waitEvicts && loc != null && loc.state() == GridDhtPartitionState.RENTING)) {
+                                    (waitEvicts && loc != null && loc.state() != GridDhtPartitionState.OWNING)) {
                                     LT.warn(log(), null, "Waiting for topology map update [" +
                                         "grid=" + g.name() +
                                         ", cache=" + cfg.getName() +
@@ -501,7 +506,7 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                                 if (i == 0)
                                     start = System.currentTimeMillis();
 
-                                if (System.currentTimeMillis() - start > 30_000) {
+                                if (System.currentTimeMillis() - start > timeout) {
                                     U.dumpThreads(log);
 
                                     throw new IgniteException("Timeout of waiting for topology map update [" +
@@ -526,6 +531,46 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
                             break;
                         }
                     }
+
+                    if (waitNode2PartUpdate) {
+                        long start = System.currentTimeMillis();
+
+                        boolean failed = true;
+
+                        while (failed) {
+                            failed = false;
+
+                            for (GridDhtPartitionMap2 pMap : top.partitionMap(true).values()) {
+                                if (failed)
+                                    break;
+
+                                for (Map.Entry entry : pMap.entrySet()) {
+                                    if (System.currentTimeMillis() - start > timeout) {
+                                        U.dumpThreads(log);
+
+                                        throw new IgniteException("Timeout of waiting for partition state update [" +
+                                            "grid=" + g.name() +
+                                            ", cache=" + cfg.getName() +
+                                            ", cacheId=" + dht.context().cacheId() +
+                                            ", topVer=" + top.topologyVersion() +
+                                            ", locNode=" + g.cluster().localNode() + ']');
+                                    }
+
+                                    if (entry.getValue() != GridDhtPartitionState.OWNING) {
+                                        LT.warn(log(), null,
+                                            "Waiting for correct partition state, should be OWNING [state=" +
+                                                entry.getValue() + "]");
+
+                                        Thread.sleep(200); // Busy wait.
+
+                                        failed = true;
+
+                                        break;
+                                    }
+                                }
+                            }
+                        }
+                    }
                 }
             }
         }


[7/8] ignite git commit: Now cache plugins are able to unwrap entries passed to EntryProcessor.

Posted by vo...@apache.org.
Now cache plugins are able to unwrap entries passed to EntryProcessor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bbe5258b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bbe5258b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bbe5258b

Branch: refs/heads/ignite-1786
Commit: bbe5258b8ffd899ccddc223b5f40632a9f624e40
Parents: a4b922c
Author: dkarachentsev <dk...@gridgain.com>
Authored: Wed Mar 30 13:29:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 30 13:29:36 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheInvokeEntry.java      | 41 +++++++++++++++-----
 .../processors/cache/CacheLazyEntry.java        |  9 ++++-
 .../processors/cache/GridCacheMapEntry.java     |  9 +++--
 .../distributed/dht/GridDhtTxPrepareFuture.java | 12 +++---
 .../dht/atomic/GridDhtAtomicCache.java          |  4 +-
 .../local/atomic/GridLocalAtomicCache.java      |  4 +-
 .../cache/transactions/IgniteTxAdapter.java     |  8 ++--
 .../cache/transactions/IgniteTxEntry.java       |  4 +-
 .../transactions/IgniteTxLocalAdapter.java      |  5 +--
 .../processors/plugin/CachePluginManager.java   | 25 ++++++++++++
 .../ignite/plugin/CachePluginProvider.java      | 11 ++++++
 ...CacheDeploymentCachePluginConfiguration.java |  7 ++++
 12 files changed, 107 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
index 2ecfdbf..2526146 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeEntry.java
@@ -40,44 +40,53 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
     /** Entry version. */
     private GridCacheVersion ver;
 
+    /** Cache entry instance. */
+    private GridCacheEntryEx entry;
+
     /**
-     * @param cctx Cache context.
+     * Constructor.
+     *
      * @param keyObj Key cache object.
      * @param valObj Cache object value.
      * @param ver Entry version.
+     * @param keepBinary Keep binary flag.
+     * @param entry Original entry.
      */
-    public CacheInvokeEntry(GridCacheContext cctx,
-        KeyCacheObject keyObj,
+    public CacheInvokeEntry(KeyCacheObject keyObj,
         @Nullable CacheObject valObj,
         GridCacheVersion ver,
-        boolean keepBinary
+        boolean keepBinary,
+        GridCacheEntryEx entry
     ) {
-        super(cctx, keyObj, valObj, keepBinary);
+        super(entry.context(), keyObj, valObj, keepBinary);
 
         this.hadVal = valObj != null;
         this.ver = ver;
+        this.entry = entry;
     }
 
     /**
-     * @param ctx Cache context.
      * @param keyObj Key cache object.
      * @param key Key value.
      * @param valObj Value cache object.
      * @param val Value.
      * @param ver Entry version.
+     * @param keepBinary Keep binary flag.
+     * @param entry Grid cache entry.
      */
-    public CacheInvokeEntry(GridCacheContext<K, V> ctx,
-        KeyCacheObject keyObj,
+    public CacheInvokeEntry(KeyCacheObject keyObj,
         @Nullable K key,
         @Nullable CacheObject valObj,
         @Nullable V val,
         GridCacheVersion ver,
-        boolean keepBinary
+        boolean keepBinary,
+        GridCacheEntryEx entry
     ) {
-        super(ctx, keyObj, key, valObj, val, keepBinary);
+        super(entry.context(), keyObj, key, valObj, val, keepBinary);
 
         this.hadVal = valObj != null || val != null;
         this.ver = ver;
+        this.entry = entry;
     }
 
     /** {@inheritDoc} */
@@ -122,12 +131,24 @@ public class CacheInvokeEntry<K, V> extends CacheLazyEntry<K, V> implements Muta
         return op != Operation.NONE;
     }
 
+    /**
+     * @return Cache entry instance.
+     */
+    public GridCacheEntryEx entry() {
+        return entry;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> cls) {
         if (cls.isAssignableFrom(CacheEntry.class) && ver != null)
             return (T)new CacheEntryImplEx<>(getKey(), getValue(), ver);
 
+        final T res = cctx.plugin().unwrapCacheEntry(this, cls);
+
+        if (res != null)
+            return res;
+
         return super.unwrap(cls);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
index 6ec17c0..c1fcb77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java
@@ -80,7 +80,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
      * @param keepBinary Keep binary flag.
      * @param val Cache value.
      */
-    public CacheLazyEntry(GridCacheContext<K, V> ctx,
+    public CacheLazyEntry(GridCacheContext ctx,
         KeyCacheObject keyObj,
         K key,
         CacheObject valObj,
@@ -136,6 +136,13 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> {
         return key;
     }
 
+    /**
+     * @return Keep binary flag.
+     */
+    public boolean keepBinary() {
+        return keepBinary;
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <T> T unwrap(Class<T> cls) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index c5df29b..08941ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1620,7 +1620,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 assert entryProcessor != null;
 
-                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(cctx, key, old, version(), keepBinary);
+                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry<>(key, old, version(), keepBinary, this);
 
                 try {
                     Object computed = entryProcessor.process(entry, invokeArgs);
@@ -1913,7 +1913,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                         oldVal = rawGetOrUnmarshalUnlocked(true);
 
-                        CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version(), keepBinary);
+                        CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(),
+                            keepBinary, this);
 
                         try {
                             Object computed = entryProcessor.process(entry, invokeArgs);
@@ -2051,7 +2052,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                                     (EntryProcessor<Object, Object, ?>)writeObj;
 
                                 CacheInvokeEntry<Object, Object> entry =
-                                    new CacheInvokeEntry<>(cctx, key, prevVal, version(), keepBinary);
+                                    new CacheInvokeEntry<>(key, prevVal, version(), keepBinary, this);
 
                                 try {
                                     entryProcessor.process(entry, invokeArgs);
@@ -2181,7 +2182,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 EntryProcessor<Object, Object, ?> entryProcessor = (EntryProcessor<Object, Object, ?>)writeObj;
 
-                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(cctx, key, oldVal, version(), keepBinary);
+                CacheInvokeEntry<Object, Object> entry = new CacheInvokeEntry(key, oldVal, version(), keepBinary, this);
 
                 try {
                     Object computed = entryProcessor.process(entry, invokeArgs);

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 732c298..445c70a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -360,6 +360,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                     if (evt && txEntry.op() == TRANSFORM)
                         entryProc = F.first(txEntry.entryProcessors()).get1();
 
+                    final boolean keepBinary = txEntry.keepBinary();
+
                     CacheObject val = cached.innerGet(
                         tx,
                         /*swap*/true,
@@ -373,7 +375,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                         entryProc,
                         tx.resolveTaskName(),
                         null,
-                        txEntry.keepBinary());
+                        keepBinary);
 
                     if (retVal || txEntry.op() == TRANSFORM) {
                         if (!F.isEmpty(txEntry.entryProcessors())) {
@@ -389,9 +391,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
 
                             boolean modified = false;
 
-                             for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
-                                 CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
-                                     txEntry.context(), key, val, txEntry.cached().version(), txEntry.keepBinary());
+                            for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
+                                 CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(key, val,
+                                     txEntry.cached().version(), keepBinary, txEntry.cached());
 
                                  try {
                                     EntryProcessor<Object, Object, Object> processor = t.get1();
@@ -435,7 +437,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
                             }
                         }
                         else if (retVal)
-                            ret.value(cacheCtx, val, txEntry.keepBinary());
+                            ret.value(cacheCtx, val, keepBinary);
                     }
 
                     if (hasFilters && !cacheCtx.isAll(cached, txEntry.filters())) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e8a2200..1f5c817 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1656,8 +1656,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     Object oldVal = null;
                     Object updatedVal = null;
 
-                    CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, entry.key(), old,
-                        entry.version(), req.keepBinary());
+                    CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(entry.key(), old,
+                        entry.version(), req.keepBinary(), entry);
 
                     CacheObject updated;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 8e5fe9e..07b70cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -1192,8 +1192,8 @@ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
 
                         Object oldVal = null;
 
-                        CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(ctx, entry.key(), old,
-                            entry.version(), keepBinary);
+                        CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(entry.key(), old,
+                            entry.version(), keepBinary, entry);
 
                         CacheObject updated;
                         Object updatedVal = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index f6dfd32..9e5d626 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -1260,6 +1260,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
 
             boolean recordEvt = cctx.gridEvents().isRecordable(EVT_CACHE_OBJECT_READ);
 
+            final boolean keepBinary = txEntry.keepBinary();
+
             CacheObject cacheVal = txEntry.hasValue() ? txEntry.value() :
                 txEntry.cached().innerGet(this,
                     /*swap*/false,
@@ -1273,7 +1275,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
                     /**closure name */recordEvt ? F.first(txEntry.entryProcessors()).get1() : null,
                     resolveTaskName(),
                     null,
-                    txEntry.keepBinary());
+                    keepBinary);
 
             boolean modified = false;
 
@@ -1296,8 +1298,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement
             }
 
             for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
-                CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(txEntry.context(),
-                    txEntry.key(), key, cacheVal, val, ver, txEntry.keepBinary());
+                CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(
+                    txEntry.key(), key, cacheVal, val, ver, keepBinary, txEntry.cached());
 
                 try {
                     EntryProcessor<Object, Object, Object> processor = t.get1();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index f682605..9060fa7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -691,8 +691,8 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message {
 
         for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : entryProcessors()) {
             try {
-                CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(ctx, key, keyVal, cacheVal, val,
-                    ver, keepBinary());
+                CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(key, keyVal, cacheVal, val,
+                    ver, keepBinary(), cached());
 
                 EntryProcessor processor = t.get1();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 1d77da5..0337145 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -2971,9 +2971,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
             Object res = null;
 
             for (T2<EntryProcessor<Object, Object, Object>, Object[]> t : txEntry.entryProcessors()) {
-                CacheInvokeEntry<Object, Object> invokeEntry =
-                    new CacheInvokeEntry(txEntry.context(), txEntry.key(), key0, cacheVal, val0, ver,
-                        txEntry.keepBinary());
+                CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(txEntry.key(), key0, cacheVal,
+                    val0, ver, txEntry.keepBinary(), txEntry.cached());
 
                 EntryProcessor<Object, Object, ?> entryProcessor = t.get1();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
index 23f1f3f..d0efc0a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/CachePluginManager.java
@@ -38,6 +38,9 @@ import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.plugin.CachePluginConfiguration;
 import org.apache.ignite.plugin.CachePluginContext;
 import org.apache.ignite.plugin.CachePluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.Cache;
 
 /**
  * Cache plugin manager.
@@ -131,6 +134,28 @@ public class CachePluginManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * Unwrap entry to specified type. For details see {@code javax.cache.Cache.Entry.unwrap(Class)}.
+     *
+     * @param entry Entry to unwrap.
+     * @param cls Type of the expected component.
+     * @param <T> Return type.
+     * @param <K> Key type.
+     * @param <V> Value type.
+     * @return New instance of underlying type or {@code null} if it's not available.
+     */
+    @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"})
+    @Nullable public <T, K, V> T unwrapCacheEntry(Cache.Entry<K, V> entry, Class<T> cls) {
+        for (int i = 0; i < providersList.size(); i++) {
+            final T res = (T)providersList.get(i).unwrapCacheEntry(entry, cls);
+
+            if (res != null)
+                return res;
+        }
+
+        return null;
+    }
+
+    /**
      * Validates cache plugin configurations. Throw exception if validation failed.
      *
      * @throws IgniteCheckedException If validation failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
index 11550ec..b7ed0b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/CachePluginProvider.java
@@ -22,6 +22,8 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.Cache;
+
 /**
  * Cache plugin provider is a point for processing of properties 
  * which provide specific {@link CachePluginConfiguration}.
@@ -65,6 +67,15 @@ public interface CachePluginProvider<C extends CachePluginConfiguration> {
     @Nullable public <T> T createComponent(Class<T> cls);
 
     /**
+     * Unwrap entry to specified type. For details see {@code javax.cache.Cache.Entry.unwrap(Class)}.
+     *
+     * @param entry Mutable entry to unwrap.
+     * @param cls Type of the expected component.
+     * @return New instance of underlying type or {@code null} if it's not available.
+     */
+    @Nullable public <T, K, V> T unwrapCacheEntry(Cache.Entry<K, V> entry, Class<T> cls);
+
+    /**
      * Validates cache plugin configuration in process of cache creation. Throw exception if validation failed.
      *
      * @throws IgniteCheckedException If validation failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/bbe5258b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
index bb37c25..ff2e674 100644
--- a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
+++ b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/CacheDeploymentCachePluginConfiguration.java
@@ -25,6 +25,8 @@ import org.apache.ignite.plugin.CachePluginContext;
 import org.apache.ignite.plugin.CachePluginProvider;
 import org.jetbrains.annotations.Nullable;
 
+import javax.cache.Cache;
+
 /**
  * Test cache plugin configuration for cache deployment tests.
  */
@@ -41,6 +43,11 @@ public class CacheDeploymentCachePluginConfiguration<K, V> implements CachePlugi
         }
 
         /** {@inheritDoc} */
+        @Nullable @Override public Object unwrapCacheEntry(final Cache.Entry mutableEntry, final Class cls) {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public void start() throws IgniteCheckedException {
 
         }


[3/8] ignite git commit: IGNITE-2910 Added detail info about keys count in partitions for offheap and swap.

Posted by vo...@apache.org.
IGNITE-2910 Added detail info about keys count in partitions for offheap and swap.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7081c019
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7081c019
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7081c019

Branch: refs/heads/ignite-1786
Commit: 7081c0197ea146700f31dd222dba73d0a7428e5d
Parents: 6a3d724
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Mar 30 13:46:37 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Mar 30 13:46:37 2016 +0700

----------------------------------------------------------------------
 .../processors/cache/GridCacheSwapManager.java  |  26 ++++-
 .../internal/visor/cache/VisorCacheV3.java      | 108 +++++++++++++++++++
 .../visor/node/VisorNodeDataCollectorJob.java   |  31 +++---
 3 files changed, 152 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7081c019/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 37c7958..d50bf0b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -306,6 +306,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param partId Partition ID to get swap entries count for.
+     * @return Number of swap entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long swapEntriesCount(int partId) throws IgniteCheckedException {
+        if (!swapEnabled)
+            return 0;
+
+        return swapMgr.swapKeys(spaceName, Collections.singleton(partId));
+    }
+
+    /**
      * @param primary If {@code true} includes primary entries.
      * @param backup If {@code true} includes backup entries.
      * @param topVer Topology version.
@@ -329,6 +341,18 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
     }
 
     /**
+     * @param partId Partition ID to get entries count for.
+     * @return Number of offheap entries.
+     * @throws IgniteCheckedException If failed.
+     */
+    public long offheapEntriesCount(int partId) throws IgniteCheckedException {
+        if (!offheapEnabled)
+            return 0;
+
+        return offheap.entriesCount(spaceName, Collections.singleton(partId));
+    }
+
+    /**
      * Gets number of swap entries (keys).
      *
      * @return Swap keys count.
@@ -2538,4 +2562,4 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             throw new UnsupportedOperationException();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7081c019/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
new file mode 100644
index 0000000..bd9a3ce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheV3.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheSwapManager;
+import org.apache.ignite.internal.util.lang.GridTuple3;
+import org.apache.ignite.internal.util.lang.IgnitePair;
+
+/**
+ * Data transfer object for {@link IgniteCache}.
+ */
+public class VisorCacheV3 extends VisorCacheV2 {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Primary partitions IDs with offheap and swap entries count. */
+    private Collection<GridTuple3<Integer, Long, Long>> primaryPartsOffheapSwap;
+
+    /** Backup partitions IDs with offheap and swap entries count. */
+    private Collection<GridTuple3<Integer, Long, Long>> backupPartsOffheapSwap;
+
+    /** {@inheritDoc} */
+    @Override public VisorCache from(IgniteEx ignite, String cacheName, int sample) throws IgniteCheckedException {
+        VisorCache c = super.from(ignite, cacheName, sample);
+
+        if (c != null && c instanceof VisorCacheV3) {
+            VisorCacheV3 cacheV3 = (VisorCacheV3)c;
+
+            GridCacheAdapter ca = ignite.context().cache().internalCache(cacheName);
+
+            // Process only started caches.
+            if (ca != null && ca.context().started()) {
+                GridCacheSwapManager swap = ca.context().swap();
+
+                cacheV3.primaryPartsOffheapSwap = new ArrayList<>(c.primaryPartitions().size());
+
+                for (IgnitePair<Integer> part: c.primaryPartitions()) {
+                    int p = part.get1();
+
+                    cacheV3.primaryPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
+                }
+
+                cacheV3.backupPartsOffheapSwap = new ArrayList<>(c.backupPartitions().size());
+
+                for (IgnitePair<Integer> part: c.backupPartitions()) {
+                    int p = part.get1();
+
+                    cacheV3.backupPartsOffheapSwap.add(new GridTuple3<>(p, swap.offheapEntriesCount(p), swap.swapEntriesCount(p)));
+                }
+            }
+        }
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected VisorCache initHistory(VisorCache c) {
+        super.initHistory(c);
+
+        if (c instanceof VisorCacheV3) {
+            ((VisorCacheV3)c).primaryPartsOffheapSwap = Collections.emptyList();
+            ((VisorCacheV3)c).backupPartsOffheapSwap = Collections.emptyList();
+        }
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override public VisorCache history() {
+        return initHistory(new VisorCacheV3());
+    }
+
+    /**
+     * @return Collection with primary partitions IDs and offheap and swap entries count.
+     */
+    public Collection<GridTuple3<Integer, Long, Long>> primaryPartitionsOffheapSwap() {
+        return primaryPartsOffheapSwap;
+    }
+
+    /**
+     * @return Collection with backup partitions IDs and offheap and swap entries count.
+     */
+    public Collection<GridTuple3<Integer, Long, Long>> backupPartitionsOffheapSwap() {
+        return backupPartsOffheapSwap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7081c019/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index f996d9a..79760ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.cache.VisorCache;
 import org.apache.ignite.internal.visor.cache.VisorCacheV2;
+import org.apache.ignite.internal.visor.cache.VisorCacheV3;
 import org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder;
 import org.apache.ignite.internal.visor.igfs.VisorIgfs;
 import org.apache.ignite.internal.visor.igfs.VisorIgfsEndpoint;
@@ -53,6 +54,9 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
     /** */
     private static final IgniteProductVersion VER_1_4_1 = IgniteProductVersion.fromString("1.4.1");
 
+    /** */
+    private static final IgniteProductVersion VER_1_5_9 = IgniteProductVersion.fromString("1.5.9");
+
     /**
      * Create job with given argument.
      *
@@ -120,6 +124,18 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
     }
 
     /**
+     * @param ver Version to check.
+     * @return {@code true} if compatible.
+     */
+    private boolean compatibleWith(IgniteProductVersion ver) {
+        for (ClusterNode node : ignite.cluster().nodes())
+            if (node.version().compareToIgnoreTimestamp(ver) <= 0)
+                return true;
+
+        return false;
+    }
+
+    /**
      * Collect caches.
      *
      * @param res Job result.
@@ -136,18 +152,9 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
                     long start0 = U.currentTimeMillis();
 
                     try {
-                        boolean compatibility = false;
-
-                        for (ClusterNode node : ignite.cluster().nodes()) {
-                            if (node.version().compareToIgnoreTimestamp(VER_1_4_1) <= 0) {
-                                compatibility = true;
-
-                                break;
-                            }
-                        }
-
-                        VisorCache cache = (compatibility ? new VisorCache() : new VisorCacheV2())
-                                .from(ignite, cacheName, arg.sample());
+                        VisorCache cache = (compatibleWith(VER_1_4_1) ? new VisorCache() :
+                                compatibleWith(VER_1_5_9) ? new VisorCacheV2() : new VisorCacheV3())
+                                    .from(ignite, cacheName, arg.sample());
 
                         if (cache != null)
                             res.caches().add(cache);


[4/8] ignite git commit: Minor change to Visor classes: added setters for name and queryMetrics properties.

Posted by vo...@apache.org.
Minor change to Visor classes: added setters for name and queryMetrics properties.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1a6bf25
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1a6bf25
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1a6bf25

Branch: refs/heads/ignite-1786
Commit: a1a6bf25a586cbd491b697319a39fafdc3198ba9
Parents: 7081c01
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Wed Mar 30 14:44:24 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Wed Mar 30 14:44:24 2016 +0700

----------------------------------------------------------------------
 .../ignite/internal/visor/cache/VisorCache.java   |  9 +++++++++
 .../internal/visor/cache/VisorCacheMetrics.java   | 18 ++++++++++++++++++
 2 files changed, 27 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a1a6bf25/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
index 7cd0669..b5151c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCache.java
@@ -301,6 +301,15 @@ public class VisorCache implements Serializable {
     }
 
     /**
+     * Sets new value for cache name.
+     *
+     * @param name New cache name.
+     */
+    public void name(String name) {
+        this.name = name;
+    }
+
+    /**
      * @return Dynamic deployment ID.
      */
     public IgniteUuid dynamicDeploymentId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/a1a6bf25/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
index 0a77dc4..5d75e25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
@@ -238,6 +238,15 @@ public class VisorCacheMetrics implements Serializable {
     }
 
     /**
+     * Sets cache name.
+     *
+     * @param name New value for cache name.
+     */
+    public void name(String name) {
+        this.name = name;
+    }
+
+    /**
      * @return Cache mode.
      */
     public CacheMode mode() {
@@ -406,6 +415,15 @@ public class VisorCacheMetrics implements Serializable {
     }
 
     /**
+     * Sets cache query metrics.
+     *
+     * @param qryMetrics New value for query metrics.
+     */
+    public void queryMetrics(VisorCacheQueryMetrics qryMetrics) {
+        this.qryMetrics = qryMetrics;
+    }
+
+    /**
      * @return Current size of evict queue used to batch up evictions.
      */
     public int dhtEvictQueueCurrentSize() {


[6/8] ignite git commit: .NET: Minor fix to a test.

Posted by vo...@apache.org.
.NET: Minor fix to a test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4b922ca
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4b922ca
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4b922ca

Branch: refs/heads/ignite-1786
Commit: a4b922caa251e8ffef0868e9ee3eaa1928564524
Parents: 12c707c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Mar 30 12:31:31 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Mar 30 12:31:31 2016 +0300

----------------------------------------------------------------------
 .../dotnet/Apache.Ignite.Core.Tests/Examples/ProjectFilesTest.cs   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a4b922ca/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ProjectFilesTest.cs
----------------------------------------------------------------------
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ProjectFilesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ProjectFilesTest.cs
index 509618e..b99b54e 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ProjectFilesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Examples/ProjectFilesTest.cs
@@ -41,7 +41,7 @@ namespace Apache.Ignite.Core.Tests.Examples
                 .Distinct()
                 .ToList();
 
-            Assert.AreEqual(4, paths.Count);
+            Assert.AreEqual(1, paths.Count);
 
             paths.ForEach(path => Assert.IsTrue(File.Exists(path), "Config file does not exist: " + path));
         }


[5/8] ignite git commit: IGNITE-2730: Ignite Events Source Streaming to Kafka. - Fixes #560.

Posted by vo...@apache.org.
IGNITE-2730: Ignite Events Source Streaming to Kafka. - Fixes #560.

Signed-off-by: shtykh_roman <rs...@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12c707c8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12c707c8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12c707c8

Branch: refs/heads/ignite-1786
Commit: 12c707c81792e20c10cece742512412a7f24dcfb
Parents: a1a6bf2
Author: shtykh_roman <rs...@yahoo.com>
Authored: Wed Mar 30 17:09:33 2016 +0900
Committer: shtykh_roman <rs...@yahoo.com>
Committed: Wed Mar 30 17:09:33 2016 +0900

----------------------------------------------------------------------
 modules/kafka/README.txt                        |  81 ++++-
 .../kafka/connect/IgniteSourceConnector.java    |  81 +++++
 .../kafka/connect/IgniteSourceConstants.java    |  44 +++
 .../stream/kafka/connect/IgniteSourceTask.java  | 328 +++++++++++++++++++
 .../serialization/CacheEventConverter.java      |  66 ++++
 .../serialization/CacheEventDeserializer.java   |  54 +++
 .../serialization/CacheEventSerializer.java     |  54 +++
 .../kafka/IgniteKafkaStreamerSelfTestSuite.java |   4 +-
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  11 +-
 .../ignite/stream/kafka/TestKafkaBroker.java    |  27 +-
 .../kafka/connect/IgniteSinkConnectorTest.java  |  13 +-
 .../connect/IgniteSourceConnectorMock.java      |  31 ++
 .../connect/IgniteSourceConnectorTest.java      | 327 ++++++++++++++++++
 .../kafka/connect/IgniteSourceTaskMock.java     |  31 ++
 .../kafka/connect/TestCacheEventFilter.java     |  31 ++
 .../kafka/src/test/resources/example-ignite.xml |   4 +-
 16 files changed, 1156 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index f4e56bd..3a1a5aa 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -33,7 +33,7 @@ interested in):
 </project>
 
 
-## Streaming Data via Kafka Connect
+## Streaming Data to Ignite via Kafka Connect
 
 Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
 For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
@@ -46,8 +46,8 @@ as described in the following subsection.
 1. Put the following jar files on Kafka's classpath
 - ignite-kafka-connect-x.x.x-SNAPSHOT.jar
 - ignite-core-x.x.x-SNAPSHOT.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
 - cache-api-1.0.0.jar
-- ignite-spring-1.5.0-SNAPSHOT.jar
 - spring-aop-4.1.0.RELEASE.jar
 - spring-beans-4.1.0.RELEASE.jar
 - spring-context-4.1.0.RELEASE.jar
@@ -127,3 +127,80 @@ k1,v1
 ```
 http://node1:8080/ignite?cmd=size&cacheName=cache1
 ```
+
+## Streaming Cache Event Data to Kafka via Kafka Connect
+
+Source connector enables listening to Ignite cache events and, upon filtering, stream them to Kafka.
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-1.5.0-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+Note that the current implementation ignores key and schema of Kafka Connect, and stores marshalled cache events
+using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=ignite-src-connector
+connector.class=IgniteSourceConnector
+tasks.max=2
+
+# cache
+topicNames=testTopic1,testTopic2
+cacheEvts=put,remove
+## if you decide to filter remotely (recommended)
+cacheFilterCls=MyFilter
+cacheName=cache1
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. Also consider using 'evtBufferSize' and 'evtBatchSize' for tuning the internal queue
+used to safely transfer data from Ignite cache to Kafka.
+
+The following cache events can be specified in the connector configurations:
+- CREATED
+- DESTROYED
+- PUT
+- READ
+- REMOVED
+- LOCKED
+- UNLOCKED
+- SWAPPED
+- UNSWAPPED
+- EXPIRED
+
+For a simple cache configuration file example, see example-ignite.xml in tests.
+
+4. Start the connector, as described in [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
new file mode 100644
index 0000000..59e2ed0
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceConnector;
+
+/**
+ * Source connector to manage source tasks that listens to registered Ignite grid events and forward them to Kafka.
+ *
+ * Note that only cache events are enabled for streaming.
+ */
+public class IgniteSourceConnector extends SourceConnector {
+    /** Source properties. */
+    private Map<String, String> configProps;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(Map<String, String> props) {
+        try {
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_NAME), "cache name");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_CFG_PATH), "path to cache config file");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.CACHE_EVENTS), "Registered cache events");
+            A.notNullOrEmpty(props.get(IgniteSourceConstants.TOPIC_NAMES), "Kafka topics");
+        }
+        catch (IllegalArgumentException e) {
+            throw new ConnectException("Cannot start IgniteSourceConnector due to configuration error", e);
+        }
+
+        configProps = props;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSourceTask.class;
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(configProps);
+
+        for (int i = 0; i < maxTasks; i++)
+            taskConfigs.add(taskProps);
+
+        return taskConfigs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
new file mode 100644
index 0000000..7d590e5
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConstants.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSourceConstants {
+    /** Ignite configuration file path. */
+    public static final String CACHE_CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Events to be listened to. Names corresponds to {@link IgniteSourceTask.CacheEvt}. */
+    public static final String CACHE_EVENTS = "cacheEvts";
+
+    /** Internal buffer size. */
+    public static final String INTL_BUF_SIZE = "evtBufferSize";
+
+    /** Size of one chunk drained from the internal buffer. */
+    public static final String INTL_BATCH_SIZE = "evtBatchSize";
+
+    /** User-defined filter class. */
+    public static final String CACHE_FILTER_CLASS = "cacheFilterCls";
+
+    /** Kafka topic. */
+    public static final String TOPIC_NAMES = "topicNames";
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
new file mode 100644
index 0000000..9eb183c
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume remote cluster cache events from the grid and inject them into Kafka.
+ * <p>
+ * Note that a task will create a bounded queue in the grid for more reliable data transfer.
+ * Queue size can be changed by {@link IgniteSourceConstants#INTL_BUF_SIZE}.
+ */
+public class IgniteSourceTask extends SourceTask {
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSourceTask.class);
+
+    /** Event buffer size. */
+    private static int evtBufSize = 100000;
+
+    /** Event buffer. */
+    private static BlockingQueue<CacheEvent> evtBuf = new LinkedBlockingQueue<>(evtBufSize);
+
+    /** Max number of events taken from the buffer at once. */
+    private static int evtBatchSize = 100;
+
+    /** Flag for stopped state. */
+    private static volatile boolean stopped = true;
+
+    /** Ignite grid configuration file. */
+    private static String igniteConfigFile;
+
+    /** Cache name. */
+    private static String cacheName;
+
+    /** Remote Listener id. */
+    private static UUID rmtLsnrId;
+
+    /** Local listener. */
+    private static TaskLocalListener locLsnr = new TaskLocalListener();
+
+    /** Remote filter. */
+    private static TaskRemoteFilter rmtLsnr = new TaskRemoteFilter();
+
+    /** User-defined filter. */
+    private static IgnitePredicate<CacheEvent> filter;
+
+    /** Topic. */
+    private static String topics[];
+
+    /** Offset. */
+    private static final Map<String, Long> offset = Collections.singletonMap("offset", 0L);
+
+    /** Partition. */
+    private static final Map<String, String> srcPartition = Collections.singletonMap("cache", null);
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return new IgniteSinkConnector().version();
+    }
+
+    /**
+     * Filtering is done remotely. Local listener buffers data for injection into Kafka.
+     *
+     * @param props Task properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        // Each task has the same parameters -- avoid setting more than once.
+        if (cacheName != null)
+            return;
+
+        cacheName = props.get(IgniteSourceConstants.CACHE_NAME);
+        igniteConfigFile = props.get(IgniteSourceConstants.CACHE_CFG_PATH);
+        topics = props.get(IgniteSourceConstants.TOPIC_NAMES).split("\\s*,\\s*");
+
+        if (props.containsKey(IgniteSourceConstants.INTL_BUF_SIZE))
+            evtBufSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BUF_SIZE));
+
+        if (props.containsKey(IgniteSourceConstants.INTL_BATCH_SIZE))
+            evtBatchSize = Integer.parseInt(props.get(IgniteSourceConstants.INTL_BATCH_SIZE));
+
+        if (props.containsKey(IgniteSourceConstants.CACHE_FILTER_CLASS)) {
+            String filterCls = props.get(IgniteSourceConstants.CACHE_FILTER_CLASS);
+            if (filterCls != null && !filterCls.isEmpty()) {
+                try {
+                    Class<? extends IgnitePredicate<CacheEvent>> clazz =
+                        (Class<? extends IgnitePredicate<CacheEvent>>)Class.forName(filterCls);
+
+                    filter = clazz.newInstance();
+                }
+                catch (Exception e) {
+                    log.error("Failed to instantiate the provided filter! " +
+                        "User-enabled filtering is ignored!", e);
+                }
+            }
+        }
+
+        try {
+            int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
+
+            rmtLsnrId = IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                .remoteListen(locLsnr, rmtLsnr, evts);
+        }
+        catch (Exception e) {
+            log.error("Failed to register event listener!", e);
+
+            throw new ConnectException(e);
+        }
+        finally {
+            stopped = false;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public List<SourceRecord> poll() throws InterruptedException {
+        ArrayList<SourceRecord> records = new ArrayList<>(evtBatchSize);
+        ArrayList<CacheEvent> evts = new ArrayList<>(evtBatchSize);
+
+        if (stopped)
+            return records;
+
+        try {
+            if (evtBuf.drainTo(evts, evtBatchSize) > 0) {
+                for (CacheEvent evt : evts) {
+                    // schema and keys are ignored.
+                    for (String topic : topics)
+                        records.add(new SourceRecord(srcPartition, offset, topic, null, evt));
+                }
+
+                return records;
+            }
+        }
+        catch (IgniteException e) {
+            log.error("Error when polling event queue!", e);
+        }
+
+        // for shutdown.
+        return null;
+    }
+
+    /**
+     * Converts comma-delimited cache events strings to Ignite internal representation.
+     *
+     * @param evtPropsStr Comma-delimited cache event names.
+     * @return Ignite internal representation of cache events to be registered with the remote listener.
+     * @throws Exception If error.
+     */
+    private int[] cacheEvents(String evtPropsStr) throws Exception {
+        String[] evtStr = evtPropsStr.split("\\s*,\\s*");
+
+        if (evtStr.length == 0)
+            return EventType.EVTS_CACHE;
+
+        int[] evts = new int[evtStr.length];
+
+        try {
+            for (int i = 0; i < evtStr.length; i++)
+                evts[i] = CacheEvt.valueOf(evtStr[i].toUpperCase()).getId();
+        }
+        catch (Exception e) {
+            log.error("Failed to recognize the provided cache event!", e);
+
+            throw new Exception(e);
+        }
+        return evts;
+    }
+
+    /**
+     * Stops the grid client.
+     */
+    @Override public synchronized void stop() {
+        if (stopped)
+            return;
+
+        stopped = true;
+
+        stopRemoteListen();
+
+        IgniteGrid.getIgnite().close();
+    }
+
+    /**
+     * Stops the remote listener.
+     */
+    protected void stopRemoteListen() {
+        if (rmtLsnrId != null)
+            IgniteGrid.getIgnite().events(IgniteGrid.getIgnite().cluster().forCacheNodes(cacheName))
+                .stopRemoteListen(rmtLsnrId);
+
+        rmtLsnrId = null;
+    }
+
+    /**
+     * Local listener buffering cache events to be further sent to Kafka.
+     */
+    private static class TaskLocalListener implements IgniteBiPredicate<UUID, CacheEvent> {
+
+        @Override public boolean apply(UUID id, CacheEvent evt) {
+            try {
+                if (!evtBuf.offer(evt, 10, TimeUnit.MILLISECONDS))
+                    log.error("Failed to buffer event {}", evt.name());
+            }
+            catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            return true;
+        }
+    }
+
+    /**
+     * Remote filter.
+     */
+    private static class TaskRemoteFilter implements IgnitePredicate<CacheEvent> {
+        @IgniteInstanceResource
+        Ignite ignite;
+
+        @Override public boolean apply(CacheEvent evt) {
+
+            Affinity affinity = ignite.affinity(cacheName);
+            ClusterNode evtNode = evt.eventNode();
+
+            if (affinity.isPrimary(evtNode, evt.key())) {
+                // Process this event. Ignored on backups.
+                if (filter != null && filter.apply(evt))
+                    return false;
+
+                return true;
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Grid instance initialized on demand.
+     */
+    private static class IgniteGrid {
+        /** Constructor. */
+        private IgniteGrid() {
+        }
+
+        /** Instance holder. */
+        private static class Holder {
+            private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+        }
+
+        /**
+         * Obtains grid instance.
+         *
+         * @return Grid instance.
+         */
+        private static Ignite getIgnite() {
+            return Holder.IGNITE;
+        }
+    }
+
+    /** Cache events available for listening. */
+    private enum CacheEvt {
+        CREATED(EventType.EVT_CACHE_ENTRY_CREATED),
+        DESTROYED(EventType.EVT_CACHE_ENTRY_DESTROYED),
+        PUT(EventType.EVT_CACHE_OBJECT_PUT),
+        READ(EventType.EVT_CACHE_OBJECT_READ),
+        REMOVED(EventType.EVT_CACHE_OBJECT_REMOVED),
+        LOCKED(EventType.EVT_CACHE_OBJECT_LOCKED),
+        UNLOCKED(EventType.EVT_CACHE_OBJECT_UNLOCKED),
+        SWAPPED(EventType.EVT_CACHE_OBJECT_SWAPPED),
+        UNSWAPPED(EventType.EVT_CACHE_OBJECT_UNSWAPPED),
+        EXPIRED(EventType.EVT_CACHE_OBJECT_EXPIRED);
+
+        /** Internal Ignite event id. */
+        private final int id;
+
+        /**
+         * Constructor.
+         *
+         * @param id Internal Ignite event id.
+         */
+        CacheEvt(int id) {
+            this.id = id;
+        }
+
+        /**
+         * Gets Ignite event id.
+         *
+         * @return Ignite event id.
+         */
+        int getId() {
+            return id;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
new file mode 100644
index 0000000..57eb7de
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventConverter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.storage.Converter;
+
+/**
+ * {@link CacheEvent} converter for Connect API.
+ */
+public class CacheEventConverter implements Converter {
+    private final CacheEventDeserializer deserializer = new CacheEventDeserializer();
+    private final CacheEventSerializer serializer = new CacheEventSerializer();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] fromConnectData(String topic, Schema schema, Object o) {
+        try {
+            return serializer.serialize(topic, (CacheEvent)o);
+        }
+        catch (SerializationException e) {
+            throw new DataException("Failed to convert to byte[] due to a serialization error", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public SchemaAndValue toConnectData(String topic, byte[] bytes) {
+        CacheEvent evt;
+
+        try {
+            evt = deserializer.deserialize(topic, bytes);
+        }
+        catch (SerializationException e) {
+            throw new DataException("Failed to convert to Kafka Connect data due to a serialization error", e);
+        }
+
+        if (evt == null) {
+            return SchemaAndValue.NULL;
+        }
+        return new SchemaAndValue(null, evt);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
new file mode 100644
index 0000000..47ce1ca
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventDeserializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * Deserializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventDeserializer implements Deserializer<CacheEvent> {
+    /** Marshaller. */
+    private static final Marshaller marsh = new JdkMarshaller();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheEvent deserialize(String topic, byte[] bytes) {
+        try {
+            return marsh.unmarshal(bytes, getClass().getClassLoader());
+        }
+        catch (IgniteCheckedException e) {
+            throw new SerializationException("Failed to deserialize cache event!", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
new file mode 100644
index 0000000..2f2d668
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/serialization/CacheEventSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect.serialization;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Serializer based on {@link JdkMarshaller}.
+ */
+public class CacheEventSerializer implements Serializer<CacheEvent> {
+    /** Marshaller. */
+    private static final Marshaller marsh = new JdkMarshaller();
+
+    /** {@inheritDoc} */
+    @Override public void configure(Map<String, ?> map, boolean b) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] serialize(String topic, CacheEvent event) {
+        try {
+            return marsh.marshal(event);
+        }
+        catch (IgniteCheckedException e) {
+            throw new SerializationException("Failed to serialize cache event!", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
index 731f540..c8d413a 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.stream.kafka;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
+import org.apache.ignite.stream.kafka.connect.IgniteSourceConnectorTest;
 
 /**
  * Apache Kafka streamers tests.
@@ -34,8 +35,9 @@ public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
         // Kafka streamer.
         suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
 
-        // Kafka streamer via Connect API.
+        // Kafka streamers via Connect API.
         suite.addTest(new TestSuite(IgniteSinkConnectorTest.class));
+        suite.addTest(new TestSuite(IgniteSourceConnectorTest.class));
 
         return suite;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 829c877..4918f87 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -28,8 +28,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.consumer.ConsumerConfig;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
 import kafka.serializer.StringDecoder;
 import kafka.utils.VerifiableProperties;
 import org.apache.ignite.Ignite;
@@ -39,6 +37,7 @@ import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
 
@@ -116,7 +115,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
         Collections.shuffle(subnet);
 
-        List<KeyedMessage<String, String>> messages = new ArrayList<>(CNT);
+        List<ProducerRecord<String, String>> messages = new ArrayList<>(CNT);
 
         Map<String, String> keyValMap = new HashMap<>();
 
@@ -127,14 +126,12 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
             String msg = runtime + VALUE_URL + ip;
 
-            messages.add(new KeyedMessage<>(topic, ip, msg));
+            messages.add(new ProducerRecord<>(topic, ip, msg));
 
             keyValMap.put(ip, msg);
         }
 
-        Producer<String, String> producer = embeddedBroker.sendMessages(messages);
-
-        producer.close();
+        embeddedBroker.sendMessages(messages);
 
         return keyValMap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
index 70acb78..4c5dc51 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -25,9 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
-import kafka.producer.ProducerConfig;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 import kafka.utils.SystemTime$;
@@ -37,6 +34,9 @@ import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.apache.curator.test.TestingServer;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import scala.Tuple2;
 
 /**
@@ -106,15 +106,17 @@ public class TestKafkaBroker {
     /**
      * Sends a message to Kafka broker.
      *
-     * @param keyedMessages List of keyed messages.
+     * @param records List of records.
      * @return Producer used to send the message.
      */
-    public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
-        Producer<String, String> producer = new Producer<>(getProducerConfig());
+    public void sendMessages(List<ProducerRecord<String, String>> records) {
+        Producer<String, String> producer = new KafkaProducer<>(getProducerConfig());
 
-        producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+        for (ProducerRecord<String, String> rec : records)
+            producer.send(rec);
 
-        return producer;
+        producer.flush();
+        producer.close();
     }
 
     /**
@@ -185,6 +187,7 @@ public class TestKafkaBroker {
         props.put("offsets.topic.replication.factor", "1");
         props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
         props.put("log.flush.interval.messages", "1");
+        props.put("log.flush.interval.ms", "10");
 
         return props;
     }
@@ -212,14 +215,14 @@ public class TestKafkaBroker {
      *
      * @return Kafka Producer config.
      */
-    private ProducerConfig getProducerConfig() {
+    private Properties getProducerConfig() {
         Properties props = new Properties();
 
-        props.put("metadata.broker.list", getBrokerAddress());
         props.put("bootstrap.servers", getBrokerAddress());
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 
-        return new ProducerConfig(props);
+        return props;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
index a8583d0..6e6d65d 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -23,8 +23,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CachePeekMode;
@@ -33,6 +31,7 @@ import org.apache.ignite.events.Event;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.stream.kafka.TestKafkaBroker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
@@ -183,7 +182,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
      * @return Map of key value messages.
      */
     private Map<String, String> produceStream(String topic) {
-        List<KeyedMessage<String, String>> messages = new ArrayList<>(EVENT_CNT);
+        List<ProducerRecord<String, String>> messages = new ArrayList<>(EVENT_CNT);
 
         Map<String, String> keyValMap = new HashMap<>();
 
@@ -193,14 +192,12 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
             String key = topic + "_" + String.valueOf(evt);
             String msg = runtime + String.valueOf(evt);
 
-            messages.add(new KeyedMessage<>(topic, key, msg));
+            messages.add(new ProducerRecord<>(topic, key, msg));
 
             keyValMap.put(key, msg);
         }
 
-        Producer<String, String> producer = kafkaBroker.sendMessages(messages);
-
-        producer.close();
+        kafkaBroker.sendMessages(messages);
 
         return keyValMap;
     }
@@ -216,7 +213,7 @@ public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
 
         props.put(ConnectorConfig.TOPICS_CONFIG, topics);
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
-        props.put(ConnectorConfig.NAME_CONFIG, "test-connector");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-sink-connector");
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
         props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
         props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
new file mode 100644
index 0000000..d983c67
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorMock.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.kafka.connect.connector.Task;
+
+/**
+ * Source connector mock for tests for using the task mock.
+ */
+public class IgniteSourceConnectorMock extends IgniteSourceConnector {
+
+    /** {@inheritDoc} */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSourceTaskMock.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
new file mode 100644
index 0000000..13b6887
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceConnectorTest.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
+/**
+ * Tests for {@link IgniteSourceConnector}.
+ */
+public class IgniteSourceConnectorTest extends GridCommonAbstractTest {
+    /** Number of input messages. */
+    private static final int EVENT_CNT = 100;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** Test topics created by connector. */
+    private static final String[] TOPICS = {"test1", "test2"};
+
+    /** Test Kafka broker. */
+    private TestKafkaBroker kafkaBroker;
+
+    /** Worker to run tasks. */
+    private Worker worker;
+
+    /** Workers' herder. */
+    private Herder herder;
+
+    /** Ignite server node shared among tests. */
+    private static Ignite grid;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTestsStarted() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        kafkaBroker = new TestKafkaBroker();
+
+        WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+
+        MemoryOffsetBackingStore offsetBackingStore = new MemoryOffsetBackingStore();
+        offsetBackingStore.configure(workerConfig.originals());
+
+        worker = new Worker(workerConfig, offsetBackingStore);
+        worker.start();
+
+        herder = new StandaloneHerder(worker);
+        herder.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        herder.stop();
+
+        worker.stop();
+
+        kafkaBroker.shutdown();
+
+        grid.cache(CACHE_NAME).clear();
+
+        // reset cache name to overwrite task configurations.
+        Field field = IgniteSourceTask.class.getDeclaredField("cacheName");
+
+        field.setAccessible(true);
+        field.set(IgniteSourceTask.class, null);
+    }
+
+    /**
+     * Tests data flow from injecting data into grid and transferring it to Kafka cluster
+     * without user-specified filter.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    public void testEventsInjectedIntoKafkaWithoutFilter() throws Exception {
+        Map<String, String> srcProps = makeSourceProps(Utils.join(TOPICS, ","));
+
+        srcProps.remove(IgniteSourceConstants.CACHE_FILTER_CLASS);
+
+        doTest(srcProps, false);
+    }
+
+    /**
+     * Tests data flow from injecting data into grid and transferring it to Kafka cluster.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    public void testEventsInjectedIntoKafka() throws Exception {
+        doTest(makeSourceProps(Utils.join(TOPICS, ",")), true);
+    }
+
+    /**
+     * Tests the source with the specified source configurations.
+     *
+     * @param srcProps Source properties.
+     * @param conditioned Flag indicating whether filtering is enabled.
+     * @throws Exception Fails if error.
+     */
+    private void doTest(Map<String, String> srcProps, boolean conditioned) throws Exception {
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+            @Override
+            public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                if (error != null)
+                    throw new RuntimeException("Failed to create a job!", error);
+            }
+        });
+
+        herder.putConnectorConfig(
+            srcProps.get(ConnectorConfig.NAME_CONFIG),
+            srcProps, true, cb);
+
+        cb.get();
+
+        // Ugh! To be sure Kafka Connect's worker thread is properly started...
+        Thread.sleep(5000);
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT);
+
+        final IgnitePredicate<CacheEvent> locLsnr = new IgnitePredicate<CacheEvent>() {
+            @Override public boolean apply(CacheEvent evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(locLsnr, EVT_CACHE_OBJECT_PUT);
+
+        IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map<String, String> keyValMap = new HashMap<>(EVENT_CNT);
+
+        keyValMap.putAll(sendData());
+
+        // Checks all events are processed.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(locLsnr);
+
+        assertEquals(EVENT_CNT, cache.size(CachePeekMode.PRIMARY));
+
+        // Checks the events are transferred to Kafka broker.
+        checkDataDelivered(conditioned);
+    }
+
+    /**
+     * Sends messages to the grid.
+     *
+     * @return Map of key value messages.
+     */
+    private Map<String, String> sendData() throws IOException {
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < EVENT_CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String key = "test_" + String.valueOf(evt);
+            String msg = runtime + String.valueOf(evt);
+
+            if (evt >= EVENT_CNT / 2)
+                key = "conditioned_" + key;
+
+            grid.cache(CACHE_NAME).put(key, msg);
+
+            keyValMap.put(key, msg);
+        }
+
+        return keyValMap;
+    }
+
+    /**
+     * Checks if events were delivered to Kafka server.
+     *
+     * @param conditioned Flag indicating whether filtering is enabled.
+     */
+    private void checkDataDelivered(boolean conditioned) {
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-grp");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
+        props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 10000);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+            "org.apache.ignite.stream.kafka.connect.serialization.CacheEventDeserializer");
+
+        KafkaConsumer<String, CacheEvent> consumer = new KafkaConsumer<>(props);
+
+        consumer.subscribe(Arrays.asList(TOPICS));
+
+        int evtCnt = 0;
+        long start = System.currentTimeMillis();
+
+        try {
+            while (false || (System.currentTimeMillis() - start) < 10000) {
+                ConsumerRecords<String, CacheEvent> records = consumer.poll(10);
+                for (ConsumerRecord<String, CacheEvent> record : records) {
+                    System.out.println("Event: offset = " + record.offset() + ", key = " + record.key()
+                        + ", value = " + record.value().toString());
+
+                    evtCnt++;
+                }
+            }
+        }
+        catch (WakeupException e) {
+            // ignore for shutdown.
+        }
+        finally {
+            consumer.close();
+
+            if (conditioned)
+                assertTrue(evtCnt == (EVENT_CNT * TOPICS.length) / 2);
+            else
+                assertTrue(evtCnt == EVENT_CNT * TOPICS.length);
+        }
+    }
+
+    /**
+     * Creates properties for test source connector.
+     *
+     * @param topics Topics.
+     * @return Test source connector properties.
+     */
+    private Map<String, String> makeSourceProps(String topics) {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-src-connector");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSourceConnectorMock.class.getName());
+        props.put(IgniteSourceConstants.CACHE_NAME, "testCache");
+        props.put(IgniteSourceConstants.CACHE_CFG_PATH, "example-ignite.xml");
+        props.put(IgniteSourceConstants.TOPIC_NAMES, topics);
+        props.put(IgniteSourceConstants.CACHE_EVENTS, "put");
+        props.put(IgniteSourceConstants.CACHE_FILTER_CLASS, TestCacheEventFilter.class.getName());
+        props.put(IgniteSourceConstants.INTL_BUF_SIZE, "1000000");
+
+        return props;
+    }
+
+    /**
+     * Creates properties for Kafka Connect workers.
+     *
+     * @return Worker configurations.
+     */
+    private Map<String, String> makeWorkerProps() throws IOException {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
new file mode 100644
index 0000000..5237e98
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTaskMock.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Source task mock for tests. It avoids closing the grid from test to test.
+ */
+public class IgniteSourceTaskMock extends IgniteSourceTask {
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        stopRemoteListen();
+
+        // don't stop the grid for tests.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
new file mode 100644
index 0000000..8978db0
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/TestCacheEventFilter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ * Test user-defined filter.
+ */
+class TestCacheEventFilter implements IgnitePredicate<CacheEvent> {
+
+    @Override public boolean apply(CacheEvent event) {
+        return ((String)event.key()).startsWith("conditioned_");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/12c707c8/modules/kafka/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/resources/example-ignite.xml b/modules/kafka/src/test/resources/example-ignite.xml
index fbb05d3..f23a306 100644
--- a/modules/kafka/src/test/resources/example-ignite.xml
+++ b/modules/kafka/src/test/resources/example-ignite.xml
@@ -30,6 +30,8 @@
         http://www.springframework.org/schema/util
         http://www.springframework.org/schema/util/spring-util.xsd">
     <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Enable peer class loading for remote events. -->
+        <property name="peerClassLoadingEnabled" value="true"/>
         <!-- Enable client mode. -->
         <property name="clientMode" value="true"/>
 
@@ -48,7 +50,7 @@
         <!-- Enable cache events. -->
         <property name="includeEventTypes">
             <list>
-                <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+                <!-- Cache events. -->
                 <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
             </list>
         </property>