You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 16:59:41 UTC

[01/23] ignite git commit: IGNITE-1370 Streamers: Implement multiple tuple extractor.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1513-final 00879c10f -> 876e37446


IGNITE-1370 Streamers: Implement multiple tuple extractor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d9734a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d9734a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d9734a0

Branch: refs/heads/ignite-1513-final
Commit: 4d9734a0842f0a46310c1f6261fdf42371db8705
Parents: b736c46
Author: Raul Kripalani <ra...@apache.org>
Authored: Thu Sep 3 23:31:08 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Sun Sep 13 01:20:24 2015 +0100

----------------------------------------------------------------------
 .../org/apache/ignite/stream/StreamAdapter.java |  48 ++++++++-
 .../stream/StreamMultipleTupleExtractor.java    |  38 +++++++
 .../ignite/stream/StreamTupleExtractor.java     |   5 +
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 104 +++++++++++++++----
 5 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 97edcbb..ffa0821 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.stream;
 
 import java.util.Map;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 
@@ -26,11 +27,22 @@ import org.apache.ignite.IgniteDataStreamer;
  * streaming from different data sources. The purpose of adapters is to
  * convert different message formats into Ignite stream key-value tuples
  * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ * <p>
+ * Two types of tuple extractors are supported:
+ * <ol>
+ *     <li>A single tuple extractor, which extracts either no or 1 tuple out of a message. See
+ *     see {@link #setTupleExtractor(StreamTupleExtractor)}.</li>
+ *     <li>A multiple tuple extractor, which is capable of extracting multiple tuples out of a single message, in the
+ *     form of a {@link Map<K, V>}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.</li>
+ * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
     /** Tuple extractor. */
     private StreamTupleExtractor<T, K, V> extractor;
 
+    /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
+    private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
+
     /** Streamer. */
     private IgniteDataStreamer<K, V> stmr;
 
@@ -84,6 +96,20 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
+     * @return Provided tuple extractor (for 1:n cardinality).
+     */
+    public StreamMultipleTupleExtractor<T, K, V> getMultipleTupleExtractor() {
+        return multipleTupleExtractor;
+    }
+
+    /**
+     * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
+     */
+    public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+        this.multipleTupleExtractor = multipleTupleExtractor;
+    }
+
+    /**
      * @return Provided {@link Ignite} instance.
      */
     public Ignite getIgnite() {
@@ -98,14 +124,28 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
-     * Converts given message to a tuple and adds it to the underlying streamer.
+     * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
+     * underlying streamer.
+     * <p>
+     * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
+     * and the latter will be ignored.
      *
      * @param msg Message to convert.
      */
     protected void addMessage(T msg) {
-        Map.Entry<K, V> e = extractor.extract(msg);
+        if (multipleTupleExtractor == null) {
+            Map.Entry<K, V> e = extractor.extract(msg);
+
+            if (e != null)
+                stmr.addData(e);
 
-        if (e != null)
-            stmr.addData(e);
+        } else {
+            Map<K, V> m = multipleTupleExtractor.extract(msg);
+
+            if (m != null)
+                stmr.addData(m);
+
+        }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
new file mode 100644
index 0000000..71ad45a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a single message to zero, one or many tuples.
+ * <p>
+ * For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}.
+ *
+ * @see StreamTupleExtractor
+ */
+public interface StreamMultipleTupleExtractor<T, K, V> {
+
+    /**
+     * Extracts a set of key-values from a message.
+     *
+     * @param msg Message.
+     * @return Map containing resulting tuples.
+     */
+    public Map<K, V> extract(T msg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index b6150ab..aed7d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -21,6 +21,11 @@ import java.util.Map;
 
 /**
  * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
+ * produce more than one tuple.
+ *
+ * @see StreamMultipleTupleExtractor
  */
 public interface StreamTupleExtractor<T, K, V> {
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 0d27af9..c89952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+            "tupleExtractor (single or multiple)");
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");
         A.ensure(threads > 0, "threads > 0");

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 185599d..8b05754 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -43,6 +44,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -111,7 +113,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     Marshaller marsh = new JdkMarshaller();
 
                     for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
+                        byte[] msg = marsh.marshal(new Message(i));
 
                         os.write(msg.length >>> 24);
                         os.write(msg.length >>> 16);
@@ -125,21 +127,52 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleEntriesFromOneMessage() throws Exception {
+        test(null, null, new Runnable() {
+            @Override public void run() {
+                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                     OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+                    Marshaller marsh = new JdkMarshaller();
+
+                    int[] values = new int[CNT];
+                    for (int i = 0; i < CNT; i++) {
+                        values[i] = i;
+                    }
+
+                    byte[] msg = marsh.marshal(new Message(values));
+
+                    os.write(msg.length >>> 24);
+                    os.write(msg.length >>> 16);
+                    os.write(msg.length >>> 8);
+                    os.write(msg.length);
+
+                    os.write(msg);
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testSizeBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
-            @Override public Tuple convert(byte[] msg) {
+        SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+            @Override public Message convert(byte[] msg) {
                 int i = (msg[0] & 0xFF) << 24;
                 i |= (msg[1] & 0xFF) << 16;
                 i |= (msg[2] & 0xFF) << 8;
                 i |= msg[3] & 0xFF;
 
-                return new Tuple(i);
+                return new Message(i);
             }
         };
 
@@ -164,7 +197,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
     }
 
     /**
@@ -178,7 +211,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     Marshaller marsh = new JdkMarshaller();
 
                     for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
+                        byte[] msg = marsh.marshal(new Message(i));
 
                         os.write(msg);
                         os.write(DELIM);
@@ -188,7 +221,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
 
     }
 
@@ -196,14 +229,14 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testDelimiterBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
-            @Override public Tuple convert(byte[] msg) {
+        SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+            @Override public Message convert(byte[] msg) {
                 int i = (msg[0] & 0xFF) << 24;
                 i |= (msg[1] & 0xFF) << 16;
                 i |= (msg[2] & 0xFF) << 8;
                 i |= msg[3] & 0xFF;
 
-                return new Tuple(i);
+                return new Message(i);
             }
         };
 
@@ -225,16 +258,17 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
     }
 
     /**
      * @param converter Converter.
      * @param r Runnable..
      */
-    private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+    private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
+        boolean oneMessagePerTuple) throws Exception
     {
-        SocketStreamer<Tuple, Integer, String> sockStmr = null;
+        SocketStreamer<Message, Integer, String> sockStmr = null;
 
         Ignite ignite = grid(0);
 
@@ -257,11 +291,24 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
 
             sockStmr.setDelimiter(delim);
 
-            sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
-                @Override public Map.Entry<Integer, String> extract(Tuple msg) {
-                    return new IgniteBiTuple<>(msg.key, msg.val);
-                }
-            });
+            if (oneMessagePerTuple) {
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+                    @Override public Map.Entry<Integer, String> extract(Message msg) {
+                        return new IgniteBiTuple<>(msg.key, msg.val);
+                    }
+                });
+            }
+            else {
+                sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor<Message, Integer, String>() {
+                    @Override public Map<Integer, String> extract(Message msg) {
+                        Map<Integer, String> answer = new HashMap<>();
+                        for (int value : msg.values) {
+                            answer.put(value, Integer.toString(value));
+                        }
+                        return answer;
+                    }
+                });
+            }
 
             if (converter != null)
                 sockStmr.setConverter(converter);
@@ -297,9 +344,9 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Tuple.
+     * Message.
      */
-    private static class Tuple implements Serializable {
+    private static class Message implements Serializable {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
@@ -309,12 +356,25 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
         /** Value. */
         private final String val;
 
+        /** Multiple values. */
+        private final int[] values;
+
         /**
          * @param key Key.
          */
-        Tuple(int key) {
+        Message(int key) {
             this.key = key;
             this.val = Integer.toString(key);
+            this.values = new int[0];
+        }
+
+        /**
+         * @param values Multiple values.
+         */
+        Message(int[] values) {
+            this.key = -1;
+            this.val = null;
+            this.values = values;
         }
     }
 }
\ No newline at end of file


[09/23] ignite git commit: IGNITE-535 Merge MQTT Streamer into master.

Posted by vo...@apache.org.
IGNITE-535 Merge MQTT Streamer into master.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88acd318
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88acd318
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88acd318

Branch: refs/heads/ignite-1513-final
Commit: 88acd318b84ce3bff8c061bb34718e0e5f7127fb
Parents: 421a523 296dd6e
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:26:04 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:26:04 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            | 114 ++++
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 553 +++++++++++++++++
 .../mqtt/IgniteMqttStreamerTestSuite.java       |  34 ++
 pom.xml                                         |   1 +
 5 files changed, 1313 insertions(+)
----------------------------------------------------------------------



[16/23] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39dace45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39dace45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39dace45

Branch: refs/heads/ignite-1513-final
Commit: 39dace45c81aef7cb913fcf4f98a7d71e34beebd
Parents: f0be45e a104087
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:38:21 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:38:21 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +--
 .../processors/cache/GridCacheProcessor.java    |   2 +-
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 4 files changed, 125 insertions(+), 25 deletions(-)
----------------------------------------------------------------------



[20/23] ignite git commit: Merge branch 'ignite-1.4'

Posted by vo...@apache.org.
Merge branch 'ignite-1.4'


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a41ae57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a41ae57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a41ae57

Branch: refs/heads/ignite-1513-final
Commit: 0a41ae57215e9d2d208d33f7a46653c4be43de9c
Parents: 88acd31 1942d75
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:53:41 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:53:41 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +--
 .../configuration/CacheConfiguration.java       |  15 +++
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     |  51 ++++----
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/GridCacheSwapEntryImpl.java           |  31 ++++-
 .../processors/cache/GridCacheSwapManager.java  |  80 ++++++++-----
 .../datastreamer/DataStreamerImpl.java          |   2 -
 .../IgniteCacheEntryListenerAbstractTest.java   |  65 +++++++++-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  33 ++++++
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  19 +--
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  54 ++++++---
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  11 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   5 +
 .../processors/query/h2/opt/GridH2Table.java    |  10 +-
 .../cache/CacheIndexStreamerTest.java           |  37 ++++--
 .../processors/cache/GridCacheSwapSelfTest.java |   4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 20 files changed, 448 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0a41ae57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------


[07/23] ignite git commit: Merge branch 'master' into feature/ignite-535-mqtt

Posted by vo...@apache.org.
Merge branch 'master' into feature/ignite-535-mqtt


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f03f3a3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f03f3a3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f03f3a3b

Branch: refs/heads/ignite-1513-final
Commit: f03f3a3b48fa105f318e9493440671188770f4ef
Parents: 53683e2 421a523
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:36:53 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:36:53 2015 +0100

----------------------------------------------------------------------
 .../org/apache/ignite/IgniteAtomicLong.java     |  15 +-
 .../apache/ignite/IgniteAtomicReference.java    |   9 +-
 .../org/apache/ignite/IgniteAtomicSequence.java |   9 +-
 .../org/apache/ignite/IgniteAtomicStamped.java  |  13 +-
 .../configuration/NearCacheConfiguration.java   |  18 +-
 .../apache/ignite/internal/IgniteKernal.java    |   7 -
 .../processors/cache/GridCacheContext.java      |   6 +-
 .../cache/GridCacheEvictionManager.java         |   6 +-
 .../cache/GridCacheEvictionResponse.java        |   2 +-
 .../processors/cache/GridCacheIoManager.java    |  47 ++--
 .../processors/cache/GridCacheMessage.java      |   7 +
 .../processors/cache/GridCacheMvccManager.java  |  34 ++-
 .../GridCachePartitionExchangeManager.java      |  41 +++-
 .../processors/cache/GridCacheProcessor.java    |  28 ++-
 .../GridDistributedLockResponse.java            |   6 +-
 .../GridDistributedTxPrepareResponse.java       |   6 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  26 +-
 .../distributed/dht/GridDhtTopologyFuture.java  |   6 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |   2 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   2 +-
 .../dht/atomic/GridDhtAtomicUpdateResponse.java |  12 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |  16 +-
 .../dht/atomic/GridNearAtomicUpdateRequest.java |   2 +
 .../atomic/GridNearAtomicUpdateResponse.java    |  11 +-
 .../colocated/GridDhtColocatedLockFuture.java   |  44 +++-
 .../dht/preloader/GridDhtForceKeysFuture.java   |   2 +-
 .../dht/preloader/GridDhtForceKeysResponse.java |   6 +-
 .../GridDhtPartitionsExchangeFuture.java        |  19 +-
 .../distributed/near/GridNearGetResponse.java   |   6 +-
 .../distributed/near/GridNearLockFuture.java    |  26 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |  20 +-
 .../near/GridNearTxFinishResponse.java          |   6 +-
 .../query/GridCacheDistributedQueryFuture.java  |  27 +-
 .../cache/query/GridCacheLocalQueryFuture.java  |   5 +
 .../cache/query/GridCacheQueryAdapter.java      | 170 ++++++++-----
 .../query/GridCacheQueryFutureAdapter.java      |  11 +-
 .../cache/query/GridCacheQueryManager.java      |  30 ++-
 .../cache/query/GridCacheQueryResponse.java     |   6 +-
 .../continuous/CacheContinuousQueryHandler.java |  12 +-
 .../transactions/IgniteTxLocalAdapter.java      |   4 +-
 .../ignite/internal/util/GridSpinBusyLock.java  |  10 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  12 +-
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |  24 +-
 .../distributed/CacheAffEarlySelfTest.java      | 245 +++++++++++++++++++
 .../GridCacheSwapScanQueryAbstractSelfTest.java | 118 ++++-----
 .../processors/igfs/IgfsAbstractSelfTest.java   |   5 +-
 .../loadtests/hashmap/GridCacheTestContext.java |   4 +-
 .../ignite/testframework/GridTestUtils.java     |  14 +-
 .../cache/CacheIndexStreamerTest.java           | 137 +++++++++++
 ...CacheScanPartitionQueryFallbackSelfTest.java | 244 +++++-------------
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   2 -
 .../Apache.Ignite.Core/Impl/IgniteManager.cs    |   2 -
 .../ignite/visor/commands/VisorConsole.scala    |  37 ++-
 .../config/benchmark-put-indexed-val.properties |   2 +-
 modules/yardstick/config/ignite-base-config.xml |   2 +-
 .../yardstick/IgniteBenchmarkArguments.java     |  24 +-
 .../org/apache/ignite/yardstick/IgniteNode.java |  12 +-
 57 files changed, 1084 insertions(+), 535 deletions(-)
----------------------------------------------------------------------



[19/23] ignite git commit: Added test.

Posted by vo...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1942d758
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1942d758
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1942d758

Branch: refs/heads/ignite-1513-final
Commit: 1942d75856ab6d317b743de71b53a29abf81316a
Parents: ca2bce0
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 17:36:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 17:36:18 2015 +0300

----------------------------------------------------------------------
 .../IgniteCachePutRetryAbstractSelfTest.java    | 33 ++++++++++++++++++++
 1 file changed, 33 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1942d758/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 943caeb..76f12c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -168,6 +168,13 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
     /**
      * @throws Exception If failed.
      */
+    public void testGetAndPut() throws Exception {
+        checkRetry(Test.GET_AND_PUT, TestMemoryMode.HEAP, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testPutStoreEnabled() throws Exception {
         checkRetry(Test.PUT, TestMemoryMode.HEAP, true);
     }
@@ -275,6 +282,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
                     break;
                 }
 
+                case GET_AND_PUT: {
+                    for (int i = 0; i < keysCnt; i++)
+                        cache.put(i, 0);
+
+                    while (System.currentTimeMillis() < stopTime) {
+                        Integer expOld = iter;
+
+                        Integer val = ++iter;
+
+                        for (int i = 0; i < keysCnt; i++) {
+                            Integer old = cache.getAndPut(i, val);
+
+                            assertTrue("Unexpected old value [old=" + old + ", exp=" + expOld + ']',
+                                expOld.equals(old) || val.equals(old));
+                        }
+
+                        for (int i = 0; i < keysCnt; i++)
+                            assertEquals(val, cache.get(i));
+                    }
+
+                    break;
+                }
+
                 case PUT_ALL: {
                     while (System.currentTimeMillis() < stopTime) {
                         Integer val = ++iter;
@@ -495,6 +525,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
         PUT,
 
         /** */
+        GET_AND_PUT,
+
+        /** */
         PUT_ALL,
 
         /** */


[17/23] ignite git commit: ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue

Posted by vo...@apache.org.
ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2

Branch: refs/heads/ignite-1513-final
Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368
Parents: 39dace4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 13:51:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 13:51:09 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  8 +--
 .../processors/cache/GridCacheMapEntry.java     | 14 ++---
 .../processors/cache/GridCacheProcessor.java    |  6 +--
 .../cache/GridCacheSwapEntryImpl.java           | 31 +++++++++--
 .../processors/cache/GridCacheSwapManager.java  | 56 +++++++++++++-------
 .../processors/query/h2/IgniteH2Indexing.java   | 19 ++++---
 .../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++-------
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  | 11 +++-
 .../query/h2/opt/GridH2RowDescriptor.java       |  5 ++
 .../cache/CacheIndexStreamerTest.java           | 33 +++++++++---
 .../processors/cache/GridCacheSwapSelfTest.java |  4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |  2 +
 12 files changed, 158 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..ae987b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 if (modes.offheap || modes.swap) {
                     GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
-                    GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap);
-
-                    cacheVal = swapEntry != null ? swapEntry.value() : null;
+                    cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
                 }
             }
             else
@@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (offheap || swap) {
             GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
 
-            GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap);
-
-            return swapEntry != null ? swapEntry.value() : null;
+            return swapMgr.readValue(key, offheap, swap);
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 961c792..4bf0aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     }
                 }
                 else
-                    e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this);
+                    e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
 
                 if (log.isDebugEnabled())
                     log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');
@@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
             }
 
             if (offheap || swap) {
-                GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap);
+                GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
 
                 return e != null ? e.value() : null;
             }
@@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         CacheObject val = rawGetOrUnmarshalUnlocked(false);
 
-        if (val == null) {
-            GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true);
-
-            if (swapEntry == null)
-                return null;
-
-            return swapEntry.value();
-        }
+        if (val == null)
+            val = cctx.swap().readValue(key, true, true);
 
         return val;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9c325aa..e92ea57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 
-                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes);
+                    GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
 
                     CacheObject val = swapEntry.value();
 
-                    if (val == null)
-                        val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(),
-                            swapEntry.valueBytes());
-
                     assert val != null;
 
                     qryMgr.remove(key, val);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index b7c66d3..6b1266f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
         long expireTime,
         @Nullable IgniteUuid keyClsLdrId,
         @Nullable IgniteUuid valClsLdrId) {
-        assert ver != null;
-
         this.valBytes = valBytes;
         this.type = type;
         this.ver = ver;
@@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
 
     /**
      * @param arr Entry bytes.
+     * @param valOnly If {@code true} unmarshalls only entry value.
      * @return Entry.
      */
-    public static GridCacheSwapEntryImpl unmarshal(byte[] arr) {
+    public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
+        if (valOnly) {
+            long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+
+            boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+
+            off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
+
+            int arrLen = UNSAFE.getInt(arr, off);
+
+            off += 4;
+
+            byte type = UNSAFE.getByte(arr, off++);
+
+            byte[] valBytes = new byte[arrLen];
+
+            UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+
+            return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
+                type,
+                null,
+                0L,
+                0L,
+                null,
+                null);
+        }
+
         long off = BYTE_ARR_OFF;
 
         long ttl = UNSAFE.getLong(arr, off);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d9a8b5c..2ab7b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param entryLocked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
      * @param readSwap Read swap flag.
+     * @param valOnly If {@code true} unmarshals only entry value.
      * @return Value from swap or {@code null}.
      * @throws IgniteCheckedException If failed.
      */
@@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         int part,
         boolean entryLocked,
         boolean readOffheap,
-        boolean readSwap)
+        boolean readSwap,
+        boolean valOnly)
         throws IgniteCheckedException
     {
         assert readOffheap || readSwap;
@@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.cache().metrics0().onOffHeapRead(bytes != null);
 
                 if (bytes != null)
-                    return swapEntry(unmarshalSwapEntry(bytes));
+                    return swapEntry(unmarshalSwapEntry(bytes, valOnly));
             }
 
             if (!swapEnabled || !readSwap)
@@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             if (bytes == null && lsnr != null)
                 return lsnr.entry;
 
-            return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null;
+            return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null;
         }
         finally {
             if (lsnr != null)
@@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                 if (rmv != null) {
                     try {
-                        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+                        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
 
                         if (entry == null)
                             return;
@@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @param locked {@code True} if cache entry is locked.
      * @param readOffheap Read offheap flag.
      * @param readSwap Read swap flag.
+     * @param valOnly If {@code true} unmarshals only entry value.
      * @return Read value.
      * @throws IgniteCheckedException If read failed.
      */
     @Nullable GridCacheSwapEntry read(GridCacheEntryEx entry,
         boolean locked,
         boolean readOffheap,
-        boolean readSwap)
+        boolean readSwap,
+        boolean valOnly)
         throws IgniteCheckedException
     {
         if (!offheapEnabled && !swapEnabled)
             return null;
 
         return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
-            readOffheap, readSwap);
+            readOffheap, readSwap, valOnly);
     }
 
     /**
@@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      * @return Read value.
      * @throws IgniteCheckedException If read failed.
      */
-    @Nullable public GridCacheSwapEntry read(KeyCacheObject key,
+    @Nullable public CacheObject readValue(KeyCacheObject key,
         boolean readOffheap,
         boolean readSwap)
         throws IgniteCheckedException
@@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         int part = cctx.affinity().partition(key);
 
-        return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap);
+        GridCacheSwapEntry swapEntry = read(key,
+            key.valueBytes(cctx.cacheObjectContext()),
+            part,
+            false,
+            readOffheap,
+            readSwap,
+            true);
+
+        assert swapEntry == null || swapEntry.value() != null : swapEntry;
+
+        return swapEntry != null ? swapEntry.value() : null;
     }
 
     /**
@@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.cache().metrics0().onOffHeapRemove();
             }
 
-            entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes));
+            entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false));
         }
 
         return entry;
@@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
                     if (rmv != null) {
                         try {
-                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+                            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
 
                             if (entry == null)
                                 return;
@@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
 
             if (lsnrs != null) {
-                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+                GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
 
                 for (GridCacheSwapListener lsnr : lsnrs)
                     lsnr.onEntryUnswapped(part, key, e);
@@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
 
         if (entryBytes != null) {
-            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+            GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false));
 
             if (entry != null) {
                 cctx.queries().onUnswap(key, entry.value());
@@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (entryBytes == null)
             return false;
 
-        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+        GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true));
 
         if (entry == null)
             return false;
@@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                 try {
                     for (Map.Entry<byte[], byte[]> e : iter) {
                         try {
-                            GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue());
+                            GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false);
 
                             IgniteUuid valLdrId = swapEntry.valueClassLoaderId();
 
@@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
     /**
      * @param bytes Bytes to unmarshal.
+     * @param valOnly If {@code true} unmarshalls only value.
      * @return Unmarshalled entry.
      */
-    private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) {
-        return GridCacheSwapEntryImpl.unmarshal(bytes);
+    private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) {
+        return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly);
     }
 
     /**
@@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         @Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
             Map.Entry<byte[], byte[]> e = iter.nextX();
 
-            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue());
+            GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
 
             return F.t(e.getKey(), swapEntry(unmarshalled));
         }
@@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
     }
 
+    /**
+     *
+     */
     private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware {
         /** */
         private Map.Entry<byte[], byte[]> entry;
@@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         /** {@inheritDoc} */
         @Override public V getValue() {
             try {
-                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+                GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
 
                 swapEntry(e);
 
@@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         /** {@inheritDoc} */
         @Override public GridCacheVersion version() {
-            GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+            GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
 
             return e.version();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2af1386..8595187 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
@@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         /** */
         private final GridUnsafeGuard guard;
 
+        /** */
+        private final boolean preferSwapVal;
+
         /**
          * @param type Type descriptor.
          * @param schema Schema.
@@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             keyType = DataType.getTypeFromClass(type.keyClass());
             valType = DataType.getTypeFromClass(type.valueClass());
+
+            preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED;
         }
 
         /** {@inheritDoc} */
@@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             if (cctx.isNear())
                 cctx = cctx.near().dht().context();
 
-            GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
+            CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true);
 
-            if (e == null)
+            if (v == null)
                 return null;
 
-            CacheObject v = e.value();
-
-            assert v != null : "swap must unmarshall it for us";
-
             return v.value(cctx.cacheObjectContext(), false);
         }
 
@@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
 
             return new GridH2KeyValueRowOffheap(this, ptr);
         }
+
+        /** {@inheritDoc} */
+        @Override public boolean preferSwapValue() {
+            return preferSwapVal;
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a16284..c11f541 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
     /**
      * Atomically updates weak value.
      *
-     * @param upd New value.
-     * @return {@code null} If update succeeded, unexpected value otherwise.
+     * @param valObj New value.
+     * @return New value if old value is empty, old value otherwise.
+     * @throws IgniteCheckedException If failed.
      */
-    protected synchronized Value updateWeakValue(Value upd) {
+    protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
         Value res = peekValue(VAL_COL);
 
         if (res != null && !(res instanceof WeakValue))
             return res;
 
+        Value upd = desc.wrap(valObj, desc.valueType());
+
         setValue(VAL_COL, new WeakValue(upd));
 
         notifyAll();
 
-        return null;
+        return upd;
     }
 
     /**
@@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
             Value v;
 
             if (col == VAL_COL) {
-                v = syncValue(0);
+                v = peekValue(VAL_COL);
 
                 long start = 0;
                 int attempt = 0;
 
                 while ((v = WeakValue.unwrap(v)) == null) {
-                    v = getOffheapValue(VAL_COL);
+                    if (!desc.preferSwapValue()) {
+                        v = getOffheapValue(VAL_COL);
 
-                    if (v != null) {
-                        setValue(VAL_COL, v);
+                        if (v != null) {
+                            setValue(VAL_COL, v);
 
-                        if (peekValue(KEY_COL) == null)
-                            cache();
+                            if (peekValue(KEY_COL) == null)
+                                cache();
 
-                        return v;
+                            return v;
+                        }
                     }
 
                     Object k = getValue(KEY_COL).getObject();
@@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
                         if (valObj != null) {
                             // Even if we've found valObj in swap, it is may be some new value,
                             // while the needed value was already unswapped, so we have to recheck it.
-                            if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) {
-                                Value upd = desc.wrap(valObj, desc.valueType());
-
-                                v = updateWeakValue(upd);
-
-                                return v == null ? upd : v;
-                            }
+                            if ((v = getOffheapValue(VAL_COL)) == null)
+                                return updateWeakValue(valObj);
                         }
                         else {
                             // If nothing found in swap then we should be already unswapped.
+                            if (desc.preferSwapValue()) {
+                                v = getOffheapValue(VAL_COL);
+
+                                if (v != null) {
+                                    setValue(VAL_COL, v);
+
+                                    if (peekValue(KEY_COL) == null)
+                                        cache();
+
+                                    return v;
+                                }
+                            }
+
                             v = syncValue(attempt);
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index de31fe1..2dd9f25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
 
     /** {@inheritDoc} */
     @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
-    @Override protected synchronized Value updateWeakValue(Value upd) {
+    @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
+        Value val = peekValue(VAL_COL);
+
+        if (val != null)
+            return val;
+
+        Value upd = desc.wrap(valObj, desc.valueType());
+
         setValue(VAL_COL, upd);
 
         notifyAll();
 
-        return null;
+        return upd;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 0edd102..ed3ff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
      * @throws IgniteCheckedException If failed.
      */
     public Value wrap(Object o, int type) throws IgniteCheckedException;
+
+    /**
+     * @return {@code True} if should check swap value before offheap.
+     */
+    public boolean preferSwapValue();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 23f4e91..e6bf22b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 
 /**
@@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     /** */
     private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
 
-
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testStreamer() throws Exception {
+    public void testStreamerAtomic() throws Exception {
+        checkStreamer(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStreamerTx() throws Exception {
+        checkStreamer(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
         final Ignite ignite = startGrid(0);
 
-        final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+        final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode));
 
         final AtomicBoolean stop = new AtomicBoolean();
 
-        final int KEYS= 10_000;
+        final int KEYS = 10_000;
 
         try {
             IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
@@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
     }
 
     /**
+     * @param atomicityMode Cache atomicity mode.
      * @return Cache configuration.
      */
-    private CacheConfiguration cacheConfiguration() {
+    private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
         CacheConfiguration ccfg = new CacheConfiguration();
 
-        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setAtomicityMode(atomicityMode);
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
-        ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+        ccfg.setMemoryMode(OFFHEAP_TIERED);
         ccfg.setOffHeapMaxMemory(0);
         ccfg.setBackups(1);
         ccfg.setIndexedTypes(Integer.class, String.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
index e0e6ff0..cd1fc93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
@@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * TODO: IGNITE-599.
-     *
      * @throws Exception If failed.
      */
     public void testSwapEviction() throws Exception {
         try {
+            fail("https://issues.apache.org/jira/browse/IGNITE-599");
+
             final CountDownLatch evicted = new CountDownLatch(10);
 
             startGrids(1);

http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index f30f70e..550c69f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
 import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
+import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
 
         suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class);
         suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class);
+        suite.addTestSuite(CacheIndexStreamerTest.class);
 
         suite.addTestSuite(CacheConfigurationP2PTest.class);
 


[21/23] ignite git commit: Merge branch 'master' into ignite-1282

Posted by vo...@apache.org.
Merge branch 'master' into ignite-1282

Conflicts:
	modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/281b9f2d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/281b9f2d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/281b9f2d

Branch: refs/heads/ignite-1513-final
Commit: 281b9f2d652ce9109523734ae3843f93feec285f
Parents: b711a5a 0a41ae5
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:58:20 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:58:20 2015 +0300

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |   5 +-
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +-
 .../configuration/CacheConfiguration.java       |  21 +
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     |  51 +-
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/GridCacheSwapEntryImpl.java           |  31 +-
 .../processors/cache/GridCacheSwapManager.java  |  80 ++-
 .../datastreamer/DataStreamerImpl.java          |   2 -
 .../org/apache/ignite/stream/StreamAdapter.java | 104 +++-
 .../stream/StreamMultipleTupleExtractor.java    |  38 ++
 .../stream/StreamSingleTupleExtractor.java      |  40 ++
 .../ignite/stream/StreamTupleExtractor.java     |  20 +-
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  65 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  33 +
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 +++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 112 +++-
 .../processors/query/h2/IgniteH2Indexing.java   |  19 +-
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  54 +-
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  11 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   5 +
 .../processors/query/h2/opt/GridH2Table.java    |  10 +-
 .../cache/CacheIndexStreamerTest.java           |  37 +-
 .../processors/cache/GridCacheSwapSelfTest.java |   4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 modules/mqtt/pom.xml                            | 114 ++++
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 553 +++++++++++++++++
 .../mqtt/IgniteMqttStreamerTestSuite.java       |  34 ++
 pom.xml                                         |   1 +
 32 files changed, 2037 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/281b9f2d/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 9fb56bc,44a3fa9..44f7e1b
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@@ -17,10 -17,25 +17,17 @@@
  
  package org.apache.ignite.configuration;
  
+ import java.io.Serializable;
+ import java.util.Collection;
 -import java.util.HashSet;
+ import javax.cache.Cache;
+ import javax.cache.configuration.CompleteConfiguration;
+ import javax.cache.configuration.Factory;
+ import javax.cache.configuration.MutableConfiguration;
+ import javax.cache.expiry.ExpiryPolicy;
  import org.apache.ignite.Ignite;
  import org.apache.ignite.IgniteCache;
 -import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 -import org.apache.ignite.cache.CacheAtomicityMode;
 -import org.apache.ignite.cache.CacheEntryProcessor;
 -import org.apache.ignite.cache.CacheInterceptor;
 -import org.apache.ignite.cache.CacheMemoryMode;
 -import org.apache.ignite.cache.CacheMode;
 -import org.apache.ignite.cache.CacheRebalanceMode;
 -import org.apache.ignite.cache.CacheTypeMetadata;
 -import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.cache.*;
  import org.apache.ignite.cache.affinity.AffinityFunction;
  import org.apache.ignite.cache.affinity.AffinityKeyMapper;
  import org.apache.ignite.cache.eviction.EvictionFilter;

http://git-wip-us.apache.org/repos/asf/ignite/blob/281b9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/281b9f2d/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 268221e,c19a9b7..df383ef
--- a/pom.xml
+++ b/pom.xml
@@@ -75,7 -75,9 +75,8 @@@
          <module>modules/kafka</module>
          <module>modules/yarn</module>
          <module>modules/jms11</module>
+         <module>modules/mqtt</module>
          <module>modules/zookeeper</module>
 -        <module>modules/platform</module>
      </modules>
  
      <profiles>


[03/23] ignite git commit: Merge branch 'master' into feature/ignite-1370.

Posted by vo...@apache.org.
Merge branch 'master' into feature/ignite-1370.

# Conflicts:
#	modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b80b1719
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b80b1719
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b80b1719

Branch: refs/heads/ignite-1513-final
Commit: b80b17196269a036da65d781eb772a317039b014
Parents: b1dee96 6e48c9c
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 18:28:09 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 18:28:09 2015 +0100

----------------------------------------------------------------------
 RELEASE_NOTES.txt                               |    2 +-
 examples/config/example-default.xml             |   76 -
 examples/config/example-ignite.xml              |   56 +-
 examples/config/filesystem/README.txt           |    2 +-
 examples/config/filesystem/example-igfs.xml     |    7 -
 .../config/portable/example-ignite-portable.xml |   44 -
 .../ignite/examples/portable/Address.java       |   72 -
 .../ignite/examples/portable/Employee.java      |   93 -
 .../ignite/examples/portable/EmployeeKey.java   |   90 -
 .../portable/ExamplePortableNodeStartup.java    |   36 -
 .../ignite/examples/portable/Organization.java  |   93 -
 .../examples/portable/OrganizationType.java     |   32 -
 ...mputeClientPortableTaskExecutionExample.java |  154 -
 .../portable/computegrid/ComputeClientTask.java |  116 -
 .../portable/computegrid/package-info.java      |   21 -
 .../CacheClientPortablePutGetExample.java       |  230 --
 .../CacheClientPortableQueryExample.java        |  325 --
 .../portable/datagrid/package-info.java         |   21 -
 .../ignite/examples/portable/package-info.java  |   21 -
 .../CacheClientPortableExampleTest.java         |   46 -
 .../ComputeClientPortableExampleTest.java       |   37 -
 .../testsuites/IgniteExamplesSelfTestSuite.java |    6 -
 modules/clients/pom.xml                         |    7 +
 modules/clients/src/test/config/jdbc-config.xml |   55 +
 .../jdbc2/JdbcComplexQuerySelfTest.java         |  316 ++
 .../internal/jdbc2/JdbcConnectionSelfTest.java  |  268 ++
 .../internal/jdbc2/JdbcEmptyCacheSelfTest.java  |  140 +
 .../internal/jdbc2/JdbcLocalCachesSelfTest.java |  156 +
 .../internal/jdbc2/JdbcMetadataSelfTest.java    |  334 ++
 .../jdbc2/JdbcPreparedStatementSelfTest.java    |  730 ++++
 .../internal/jdbc2/JdbcResultSetSelfTest.java   |  751 ++++
 .../internal/jdbc2/JdbcStatementSelfTest.java   |  292 ++
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   11 +
 modules/core/pom.xml                            |   21 -
 .../src/main/java/org/apache/ignite/Ignite.java |    7 -
 .../java/org/apache/ignite/IgniteCache.java     |   44 +-
 .../org/apache/ignite/IgniteJdbcDriver.java     |  281 +-
 .../java/org/apache/ignite/IgnitePortables.java |  370 --
 .../apache/ignite/IgniteSystemProperties.java   |    5 +-
 .../configuration/CacheConfiguration.java       |   70 +-
 .../ignite/internal/GridKernalContext.java      |    7 +-
 .../ignite/internal/GridKernalContextImpl.java  |   10 +-
 .../apache/ignite/internal/GridLoggerProxy.java |    6 +-
 .../org/apache/ignite/internal/IgniteEx.java    |    9 +
 .../apache/ignite/internal/IgniteKernal.java    |   14 +-
 .../ignite/internal/IgniteNodeAttributes.java   |    5 +-
 .../internal/executor/GridExecutorService.java  |    4 +-
 .../ignite/internal/jdbc/JdbcConnection.java    |    4 +
 .../internal/jdbc/JdbcConnectionInfo.java       |   91 -
 .../internal/jdbc/JdbcDatabaseMetadata.java     |    4 +
 .../internal/jdbc/JdbcPreparedStatement.java    |    4 +
 .../ignite/internal/jdbc/JdbcResultSet.java     |    4 +
 .../internal/jdbc/JdbcResultSetMetadata.java    |    4 +
 .../ignite/internal/jdbc/JdbcStatement.java     |    4 +
 .../apache/ignite/internal/jdbc/JdbcUtils.java  |    4 +
 .../ignite/internal/jdbc2/JdbcConnection.java   |  777 ++++
 .../internal/jdbc2/JdbcDatabaseMetadata.java    | 1401 +++++++
 .../internal/jdbc2/JdbcPreparedStatement.java   |  411 ++
 .../ignite/internal/jdbc2/JdbcQueryTask.java    |  361 ++
 .../ignite/internal/jdbc2/JdbcResultSet.java    | 1520 +++++++
 .../internal/jdbc2/JdbcResultSetMetadata.java   |  171 +
 .../ignite/internal/jdbc2/JdbcStatement.java    |  456 +++
 .../apache/ignite/internal/jdbc2/JdbcUtils.java |  155 +
 .../deployment/GridDeploymentStoreAdapter.java  |    4 +-
 .../discovery/GridDiscoveryManager.java         |   10 -
 .../portable/GridPortableMarshaller.java        |    2 +-
 .../portable/PortableClassDescriptor.java       |   10 +-
 .../internal/portable/PortableContext.java      |   32 +-
 .../portable/PortableMetaDataCollector.java     |    6 +-
 .../portable/PortableMetaDataHandler.java       |    4 +-
 .../internal/portable/PortableMetaDataImpl.java |   14 +-
 .../internal/portable/PortableObjectEx.java     |    6 +-
 .../internal/portable/PortableObjectImpl.java   |    6 +-
 .../portable/PortableObjectOffheapImpl.java     |    6 +-
 .../internal/portable/PortableRawReaderEx.java  |    4 +-
 .../internal/portable/PortableRawWriterEx.java  |    4 +-
 .../portable/PortableReaderContext.java         |    2 +-
 .../internal/portable/PortableReaderExImpl.java |   10 +-
 .../ignite/internal/portable/PortableUtils.java |    2 +-
 .../internal/portable/PortableWriterExImpl.java |   11 +-
 .../internal/portable/api/IgnitePortables.java  |  362 ++
 .../internal/portable/api/PortableBuilder.java  |  136 +
 .../portable/api/PortableException.java         |   57 +
 .../internal/portable/api/PortableIdMapper.java |   54 +
 .../api/PortableInvalidClassException.java      |   58 +
 .../portable/api/PortableMarshalAware.java      |   48 +
 .../portable/api/PortableMarshaller.java        |  358 ++
 .../internal/portable/api/PortableMetadata.java |   60 +
 .../internal/portable/api/PortableObject.java   |  152 +
 .../portable/api/PortableProtocolVersion.java   |   41 +
 .../portable/api/PortableRawReader.java         |  234 ++
 .../portable/api/PortableRawWriter.java         |  219 +
 .../internal/portable/api/PortableReader.java   |  284 ++
 .../portable/api/PortableSerializer.java        |   47 +
 .../portable/api/PortableTypeConfiguration.java |  195 +
 .../internal/portable/api/PortableWriter.java   |  266 ++
 .../portable/builder/PortableBuilderEnum.java   |    2 +-
 .../portable/builder/PortableBuilderImpl.java   |   14 +-
 .../portable/builder/PortableBuilderReader.java |    2 +-
 .../builder/PortableBuilderSerializer.java      |    2 +-
 .../builder/PortableEnumArrayLazyValue.java     |    4 +-
 .../builder/PortableObjectArrayLazyValue.java   |    2 +-
 .../builder/PortablePlainPortableObject.java    |    2 +-
 .../streams/PortableAbstractInputStream.java    |    2 +-
 .../processors/cache/GridCacheAdapter.java      |   31 +-
 .../cache/GridCacheClearAllRunnable.java        |    4 +-
 .../processors/cache/GridCacheContext.java      |    4 +-
 .../processors/cache/GridCacheIoManager.java    |    4 +-
 .../processors/cache/GridCacheLogger.java       |    4 +-
 .../processors/cache/GridCacheMapEntry.java     |   15 +-
 .../processors/cache/GridCacheMvcc.java         |    5 +-
 .../processors/cache/GridCacheProcessor.java    |   37 +-
 .../cache/GridCacheSharedContext.java           |    4 +-
 .../processors/cache/GridCacheSwapManager.java  |   46 +-
 .../processors/cache/IgniteCacheProxy.java      |    5 -
 .../distributed/GridCacheTxRecoveryFuture.java  |   11 +-
 .../distributed/GridDistributedCacheEntry.java  |    6 +-
 .../GridDistributedTxFinishRequest.java         |   13 +-
 .../GridDistributedTxRemoteAdapter.java         |   18 +-
 .../dht/GridClientPartitionTopology.java        |  104 +-
 .../distributed/dht/GridDhtLocalPartition.java  |    1 +
 .../dht/GridDhtPartitionTopology.java           |    4 +-
 .../dht/GridDhtPartitionTopologyImpl.java       |   16 +-
 .../dht/GridDhtTransactionalCacheAdapter.java   |  514 ++-
 .../distributed/dht/GridDhtTxFinishFuture.java  |   15 +-
 .../distributed/dht/GridDhtTxFinishRequest.java |   84 +-
 .../dht/GridDhtTxFinishResponse.java            |   89 +-
 .../cache/distributed/dht/GridDhtTxLocal.java   |    4 +-
 .../distributed/dht/GridDhtTxLocalAdapter.java  |   67 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   34 +-
 .../cache/distributed/dht/GridDhtTxRemote.java  |   40 +-
 .../dht/GridPartitionedGetFuture.java           |    4 +-
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |    4 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   11 +-
 .../distributed/near/GridNearLockFuture.java    |   11 +-
 .../distributed/near/GridNearLockRequest.java   |   18 +-
 .../near/GridNearOptimisticTxPrepareFuture.java |   55 +-
 .../GridNearPessimisticTxPrepareFuture.java     |   11 +-
 .../near/GridNearTxFinishFuture.java            |  345 +-
 .../near/GridNearTxFinishRequest.java           |   20 +-
 .../cache/distributed/near/GridNearTxLocal.java |   64 +-
 .../distributed/near/GridNearTxRemote.java      |   38 +-
 .../CacheDefaultPortableAffinityKeyMapper.java  |    2 +-
 .../portable/CacheObjectPortableContext.java    |    2 +-
 .../portable/CacheObjectPortableProcessor.java  |    8 +-
 .../CacheObjectPortableProcessorImpl.java       |   12 +-
 .../cache/portable/IgnitePortablesImpl.java     |   10 +-
 .../cache/store/CacheOsStoreManager.java        |    4 +-
 .../cache/transactions/IgniteTxAdapter.java     |    5 +-
 .../cache/transactions/IgniteTxHandler.java     |  286 +-
 .../transactions/IgniteTxLocalAdapter.java      |   37 +-
 .../cache/transactions/IgniteTxManager.java     |   48 +-
 .../continuous/GridContinuousProcessor.java     |   22 +-
 .../datastructures/DataStructuresProcessor.java |  102 +-
 .../datastructures/GridCacheAtomicLongImpl.java |    4 +-
 .../GridCacheAtomicReferenceImpl.java           |    4 +-
 .../GridCacheAtomicSequenceImpl.java            |    4 +-
 .../GridCacheAtomicStampedImpl.java             |    4 +-
 .../GridCacheCountDownLatchImpl.java            |    4 +-
 .../GridTransactionalCacheQueueImpl.java        |   15 +-
 .../processors/igfs/IgfsFileAffinityRange.java  |    4 +-
 .../processors/igfs/IgfsFileWorkerBatch.java    |   15 +-
 .../igfs/IgfsFragmentizerManager.java           |    8 +-
 .../processors/igfs/IgfsServerManager.java      |    5 +-
 .../internal/processors/job/GridJobWorker.java  |    4 +-
 .../dotnet/PlatformDotNetConfiguration.java     |   12 +-
 .../PlatformDotNetPortableConfiguration.java    |   12 +-
 ...PlatformDotNetPortableTypeConfiguration.java |   12 +-
 .../processors/task/GridTaskWorker.java         |    4 +-
 .../util/GridSpiCloseableIteratorWrapper.java   |    5 +
 .../ignite/internal/util/IgniteUtils.java       |   16 +
 .../util/offheap/GridOffHeapEvictListener.java  |    2 +-
 .../util/offheap/GridOffHeapPartitionedMap.java |    1 +
 .../util/offheap/unsafe/GridUnsafeMap.java      |    4 +-
 .../marshaller/portable/PortableMarshaller.java |  358 --
 .../marshaller/portable/package-info.java       |   22 -
 .../apache/ignite/portable/PortableBuilder.java |  137 -
 .../ignite/portable/PortableException.java      |   57 -
 .../ignite/portable/PortableIdMapper.java       |   56 -
 .../portable/PortableInvalidClassException.java |   58 -
 .../ignite/portable/PortableMarshalAware.java   |   48 -
 .../ignite/portable/PortableMetadata.java       |   61 -
 .../apache/ignite/portable/PortableObject.java  |  154 -
 .../portable/PortableProtocolVersion.java       |   41 -
 .../ignite/portable/PortableRawReader.java      |  234 --
 .../ignite/portable/PortableRawWriter.java      |  219 -
 .../apache/ignite/portable/PortableReader.java  |  284 --
 .../ignite/portable/PortableSerializer.java     |   49 -
 .../portable/PortableTypeConfiguration.java     |  196 -
 .../apache/ignite/portable/PortableWriter.java  |  266 --
 .../apache/ignite/portable/package-info.java    |   22 -
 .../resources/META-INF/classnames.properties    |   20 +-
 .../GridDiscoveryManagerAttributesSelfTest.java |   45 -
 .../GridPortableAffinityKeySelfTest.java        |  218 -
 .../GridPortableBuilderAdditionalSelfTest.java  | 1226 ------
 .../portable/GridPortableBuilderSelfTest.java   | 1021 -----
 ...eBuilderStringAsCharsAdditionalSelfTest.java |   28 -
 ...ridPortableBuilderStringAsCharsSelfTest.java |   28 -
 ...idPortableMarshallerCtxDisabledSelfTest.java |  256 --
 .../GridPortableMarshallerSelfTest.java         | 3807 ------------------
 .../GridPortableMetaDataDisabledSelfTest.java   |  238 --
 .../portable/GridPortableMetaDataSelfTest.java  |  369 --
 .../portable/GridPortableWildcardsSelfTest.java |  482 ---
 .../GridPortableMarshalerAwareTestClass.java    |   67 -
 .../mutabletest/GridPortableTestClasses.java    |  434 --
 .../portable/mutabletest/package-info.java      |   22 -
 .../ignite/internal/portable/package-info.java  |   22 -
 .../portable/test/GridPortableTestClass1.java   |   28 -
 .../portable/test/GridPortableTestClass2.java   |   24 -
 .../internal/portable/test/package-info.java    |   22 -
 .../test/subpackage/GridPortableTestClass3.java |   24 -
 .../portable/test/subpackage/package-info.java  |   22 -
 .../CacheStoreUsageMultinodeAbstractTest.java   |   16 +-
 .../cache/GridCacheAbstractFullApiSelfTest.java |   57 +-
 .../GridCacheAbstractRemoveFailureTest.java     |  199 +-
 .../cache/GridCacheMemoryModeSelfTest.java      |    9 +-
 .../processors/cache/GridCacheMvccSelfTest.java |    4 +-
 .../cache/GridCacheP2PUndeploySelfTest.java     |   30 +-
 .../cache/GridCachePutAllFailoverSelfTest.java  |   88 +-
 .../GridCacheVariableTopologySelfTest.java      |    5 +-
 .../cache/IgniteCacheCreateRestartSelfTest.java |  106 +
 .../IgniteCacheEntryProcessorNodeJoinTest.java  |   73 +
 .../cache/IgniteCachePutAllRestartTest.java     |    2 +
 .../cache/IgniteInternalCacheTypesTest.java     |    4 +-
 .../cache/IgniteOnePhaseCommitNearSelfTest.java |  243 ++
 .../IgniteTxExceptionAbstractSelfTest.java      |   29 +-
 .../GridCacheAbstractNodeRestartSelfTest.java   |  149 +-
 ...ridCachePartitionNotLoadedEventSelfTest.java |   27 +-
 .../GridCacheTransformEventSelfTest.java        |    5 +-
 .../GridCacheColocatedTxExceptionSelfTest.java  |    2 +-
 .../GridCacheDhtAtomicRemoveFailureTest.java    |   16 +-
 .../dht/GridCacheDhtRemoveFailureTest.java      |   16 +-
 .../dht/GridCacheTxNodeFailureSelfTest.java     |  405 ++
 .../dht/GridNearCacheTxNodeFailureSelfTest.java |   31 +
 ...gniteAtomicLongChangingTopologySelfTest.java |  278 ++
 .../IgniteCacheCrossCacheTxFailoverTest.java    |   53 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  166 +-
 .../dht/IgniteCachePutRetryAtomicSelfTest.java  |    2 +
 ...gniteCachePutRetryTransactionalSelfTest.java |   50 +-
 ...eAtomicInvalidPartitionHandlingSelfTest.java |   98 +-
 ...tomicPrimaryWriteOrderRemoveFailureTest.java |   15 +-
 .../GridCacheAtomicRemoveFailureTest.java       |   15 +-
 .../GridCacheAtomicNearRemoveFailureTest.java   |   15 +-
 ...cPrimaryWriteOrderNearRemoveFailureTest.java |   15 +-
 .../near/GridCacheNearRemoveFailureTest.java    |   15 +-
 .../near/GridCacheNearTxExceptionSelfTest.java  |    2 +-
 .../GridCachePartitionedNodeRestartTest.java    |    9 +-
 ...ePartitionedOptimisticTxNodeRestartTest.java |    9 +-
 .../near/IgniteCacheNearOnlyTxTest.java         |   14 +-
 .../GridCacheReplicatedNodeRestartSelfTest.java |    8 +-
 .../GridCacheReplicatedTxExceptionSelfTest.java |    2 +-
 .../GridCacheLocalTxExceptionSelfTest.java      |    2 +-
 ...ClientNodePortableMetadataMultinodeTest.java |  295 --
 ...GridCacheClientNodePortableMetadataTest.java |  286 --
 ...ableObjectsAbstractDataStreamerSelfTest.java |  190 -
 ...bleObjectsAbstractMultiThreadedSelfTest.java |  231 --
 ...ridCachePortableObjectsAbstractSelfTest.java |  978 -----
 .../GridCachePortableStoreAbstractSelfTest.java |  297 --
 .../GridCachePortableStoreObjectsSelfTest.java  |   55 -
 ...GridCachePortableStorePortablesSelfTest.java |   66 -
 ...ridPortableCacheEntryMemorySizeSelfTest.java |   55 -
 ...leDuplicateIndexObjectsAbstractSelfTest.java |  158 -
 .../DataStreamProcessorPortableSelfTest.java    |   66 -
 .../GridDataStreamerImplSelfTest.java           |  345 --
 ...ridCacheAffinityRoutingPortableSelfTest.java |   47 -
 ...lyPortableDataStreamerMultiNodeSelfTest.java |   29 -
 ...rtableDataStreamerMultithreadedSelfTest.java |   47 -
 ...artitionedOnlyPortableMultiNodeSelfTest.java |   28 -
 ...tionedOnlyPortableMultithreadedSelfTest.java |   47 -
 .../GridCacheMemoryModePortableSelfTest.java    |   36 -
 ...acheOffHeapTieredAtomicPortableSelfTest.java |   47 -
 ...eapTieredEvictionAtomicPortableSelfTest.java |   95 -
 ...heOffHeapTieredEvictionPortableSelfTest.java |   95 -
 .../GridCacheOffHeapTieredPortableSelfTest.java |   47 -
 ...ateIndexObjectPartitionedAtomicSelfTest.java |   38 -
 ...xObjectPartitionedTransactionalSelfTest.java |   41 -
 ...AtomicNearDisabledOffheapTieredSelfTest.java |   29 -
 ...rtableObjectsAtomicNearDisabledSelfTest.java |   51 -
 ...tableObjectsAtomicOffheapTieredSelfTest.java |   29 -
 .../GridCachePortableObjectsAtomicSelfTest.java |   51 -
 ...tionedNearDisabledOffheapTieredSelfTest.java |   30 -
 ...eObjectsPartitionedNearDisabledSelfTest.java |   51 -
 ...ObjectsPartitionedOffheapTieredSelfTest.java |   30 -
 ...CachePortableObjectsPartitionedSelfTest.java |   51 -
 ...sNearPartitionedByteArrayValuesSelfTest.java |   41 -
 ...sPartitionedOnlyByteArrayValuesSelfTest.java |   42 -
 ...dCachePortableObjectsReplicatedSelfTest.java |   51 -
 ...CachePortableObjectsAtomicLocalSelfTest.java |   32 -
 ...rtableObjectsLocalOffheapTieredSelfTest.java |   29 -
 .../GridCachePortableObjectsLocalSelfTest.java  |   51 -
 .../processors/igfs/IgfsAbstractSelfTest.java   |   61 +-
 .../igfs/IgfsDualAbstractSelfTest.java          |   10 +-
 .../processors/igfs/IgfsStartCacheTest.java     |    2 +-
 .../offheap/GridOffHeapMapAbstractSelfTest.java |    6 +-
 ...idOffHeapPartitionedMapAbstractSelfTest.java |   10 +-
 .../stream/socket/SocketStreamerSelfTest.java   |   29 +-
 .../ignite/testframework/GridTestUtils.java     |  117 +
 .../ignite/testframework/junits/IgniteMock.java |    8 +-
 .../multijvm/IgniteCacheProcessProxy.java       |    5 -
 .../junits/multijvm/IgniteProcessProxy.java     |    2 +-
 .../IgniteCacheFailoverTestSuite.java           |   15 +-
 .../IgniteCacheFailoverTestSuite3.java          |   62 +
 .../testsuites/IgniteCacheRestartTestSuite.java |   15 +-
 .../IgniteCacheRestartTestSuite2.java           |   47 +
 .../IgnitePortableCacheFullApiTestSuite.java    |   37 -
 .../IgnitePortableCacheTestSuite.java           |  103 -
 .../IgnitePortableObjectsTestSuite.java         |   92 -
 .../ignite/portable/test1/1.1/test1-1.1.jar     |  Bin 2548 -> 0 bytes
 .../ignite/portable/test1/1.1/test1-1.1.pom     |    9 -
 .../portable/test1/maven-metadata-local.xml     |   12 -
 .../ignite/portable/test2/1.1/test2-1.1.jar     |  Bin 1361 -> 0 bytes
 .../ignite/portable/test2/1.1/test2-1.1.pom     |    9 -
 .../portable/test2/maven-metadata-local.xml     |   12 -
 .../hadoop/SecondaryFileSystemProvider.java     |    4 +-
 .../hadoop/igfs/HadoopIgfsWrapper.java          |  100 +-
 .../HadoopDefaultMapReducePlannerSelfTest.java  |    6 +
 .../testsuites/IgniteHadoopTestSuite.java       |    6 +-
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |   31 +-
 ...CacheScanPartitionQueryFallbackSelfTest.java |  105 +-
 ...QueryOffheapEvictsMultiThreadedSelfTest.java |    5 -
 ...lientQueryReplicatedNodeRestartSelfTest.java |    8 +-
 .../IgniteCacheQueryNodeRestartSelfTest.java    |    4 +-
 .../IgniteCacheQueryNodeRestartSelfTest2.java   |   10 +-
 .../IgniteCacheReplicatedQuerySelfTest.java     |   49 +-
 .../IgnitePortableCacheQueryTestSuite.java      |  117 -
 .../platform/PlatformContextImpl.java           |    4 +-
 .../platform/compute/PlatformCompute.java       |    2 +-
 .../cpp/PlatformCppConfigurationClosure.java    |    2 +-
 .../PlatformDotNetConfigurationClosure.java     |    6 +-
 .../platform/events/PlatformEvents.java         |    2 +-
 .../services/PlatformAbstractService.java       |    3 +-
 .../Cache/CacheAbstractTest.cs                  |   71 +-
 .../Config/Compute/compute-grid1.xml            |    8 +-
 .../PlatformComputePortableArgTask.java         |    8 +-
 .../ignite/schema/generator/CodeGenerator.java  |    4 +-
 .../ignite/schema/model/PojoDescriptor.java     |    6 +-
 .../parser/dialect/OracleMetadataDialect.java   |    7 +-
 .../org/apache/ignite/IgniteSpringBean.java     |    7 -
 .../yardstick/config/benchmark-query.properties |    5 +-
 modules/yardstick/config/ignite-base-config.xml |    2 +-
 modules/yardstick/config/ignite-jdbc-config.xml |   55 +
 parent/pom.xml                                  |   10 -
 pom.xml                                         |   10 +
 343 files changed, 15499 insertions(+), 18992 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b80b1719/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 6786b7e,da15de3..1056990
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@@ -267,10 -233,10 +268,11 @@@ public class SocketStreamerSelfTest ext
       * @param converter Converter.
       * @param r Runnable..
       */
-     private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
-         boolean oneMessagePerTuple) throws Exception
-     {
 -    private void test(@Nullable SocketMessageConverter<Tuple> converter,
 -        @Nullable byte[] delim,
 -        Runnable r) throws Exception {
 -        SocketStreamer<Tuple, Integer, String> sockStmr = null;
++    private void test(@Nullable SocketMessageConverter<Message> converter, 
++        @Nullable byte[] delim, 
++        Runnable r,
++        boolean oneMessagePerTuple) throws Exception {
 +        SocketStreamer<Message, Integer, String> sockStmr = null;
  
          Ignite ignite = grid(0);
  


[08/23] ignite git commit: IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.

Posted by vo...@apache.org.
IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/296dd6e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/296dd6e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/296dd6e7

Branch: refs/heads/ignite-1513-final
Commit: 296dd6e7d86fe6d0914a9fbf8062632c04e4d22c
Parents: f03f3a3
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:24:44 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:24:44 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            |   9 +-
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 156 ++++++++++++++++++-
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  80 +++++++++-
 3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index 4b0b46c..21511e8 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,7 @@
 
     <properties>
         <paho.version>1.0.2</paho.version>
-        <activemq.version>5.11.1</activemq.version>
+        <activemq.version>5.12.0</activemq.version>
         <guava-retryier.version>2.0.0</guava-retryier.version>
     </properties>
 
@@ -69,13 +69,6 @@
 
         <dependency>
             <groupId>org.apache.activemq</groupId>
-            <artifactId>activemq-kahadb-store</artifactId>
-            <version>${activemq.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.activemq</groupId>
             <artifactId>activemq-mqtt</artifactId>
             <version>${activemq.version}</version>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index b86d385..f18ae42 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -62,12 +62,17 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
  *     <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
  *     <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
  *         sessions, etc.</li>
- *     <li>Specifying the client ID.</li>
+ *     <li>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user
+ *         does not provide one.</li>
+ *     <li>(Re-)Connection retries based on the <i>guava-retrying</i> library. Retry wait and retry stop policies
+ *         can be configured.</li>
+ *     <li>Blocking the start() method until connected for the first time.</li>
  * </ul>
  *
- * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * Note: features like durable subscriptions, last will testament, etc. can be configured via the
  * {@link #connectOptions} property.
  *
+ * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
  * @author Raul Kripalani
  */
 public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -75,46 +80,65 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     /** Logger. */
     private IgniteLogger log;
 
+    /** The MQTT client object for internal use. */
     private MqttClient client;
 
+    /** The broker URL, set by the user. */
     private String brokerUrl;
 
+    /** The topic to subscribe to, if a single topic. */
     private String topic;
 
+    /** The quality of service to use for a single topic subscription (optional). */
     private Integer qualityOfService;
 
+    /** The topics to subscribe to, if many. */
     private List<String> topics;
 
+    /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+     *  number of elements as {@link #topics}. */
     private List<Integer> qualitiesOfService;
 
-    /** Client ID in case we're using durable subscribers. */
+    /** The MQTT client ID (optional). */
     private String clientId;
 
+    /** A configurable persistence mechanism. If not set, Paho will use its default. */
     private MqttClientPersistence persistence;
 
+    /** The MQTT client connect options, where users can configured the last will and testament, durability, etc. */
     private MqttConnectOptions connectOptions;
 
-    // disconnect parameters
+    /** Quiesce timeout on disconnection. */
     private Integer disconnectQuiesceTimeout;
 
+    /** Whether to disconnect forcibly or not. */
     private boolean disconnectForcibly;
 
+    /** If disconnecting forcibly, the timeout. */
     private Integer disconnectForciblyTimeout;
 
+    /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+     *  Fibonacci-based strategy. */
     private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
 
+    /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
     private StopStrategy retryStopStrategy = StopStrategies.neverStop();
 
+    /** The internal connection retrier object with a thread pool of size 1. */
     private MqttConnectionRetrier connectionRetrier;
 
+    /** Whether to block the start() method until connected for the first time. */
+    private boolean blockUntilConnected;
+
+    /** State keeping. */
     private volatile boolean stopped = true;
 
+    /** State keeping. */
     private volatile boolean connected;
 
+    /** Cached log prefix for cache messages. */
     private String cachedLogPrefix;
 
-    private boolean blockUntilConnected;
-
     /**
      * Starts streamer.
      *
@@ -136,7 +160,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
                 "both single and multiple tuple extractor");
             A.notNullOrEmpty(brokerUrl, "broker URL");
-            A.notNullOrEmpty(clientId, "client ID");
+
+            // if the client ID is empty, generate one
+            if (clientId == null || clientId.length() == 0) {
+                clientId = MqttClient.generateClientId();
+            }
 
             // if we have both a single topic and a list of topics (but the list of topic is not of
             // size 1 and == topic, as this would be a case of re-initialization), fail
@@ -257,6 +285,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     //  MQTT Client callback methods
     // -------------------------------
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void connectionLost(Throwable throwable) {
         connected = false;
 
@@ -268,6 +299,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         connectionRetrier.connect();
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
         if (getMultipleTupleExtractor() != null) {
             Map<K, V> entries = getMultipleTupleExtractor().extract(message);
@@ -285,6 +319,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         }
     }
 
+    /**
+     * {@inheritDoc}
+     */
     @Override public void deliveryComplete(IMqttDeliveryToken token) {
         // ignore, as we don't send messages
     }
@@ -293,127 +330,229 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
     //  Getters and setters
     // -------------------------------
 
+    /**
+     * @return
+     */
     public String getBrokerUrl() {
         return brokerUrl;
     }
 
+    /**
+     * @param brokerUrl The Broker URL (compulsory).
+     */
     public void setBrokerUrl(String brokerUrl) {
         this.brokerUrl = brokerUrl;
     }
 
+    /**
+     * @return
+     */
     public String getTopic() {
         return topic;
     }
 
+    /**
+     * @param topic The topic to subscribe to, if a single topic.
+     */
     public void setTopic(String topic) {
         this.topic = topic;
     }
 
+    /**
+     * @return
+     */
     public Integer getQualityOfService() {
         return qualityOfService;
     }
 
+    /**
+     * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+     */
     public void setQualityOfService(Integer qualityOfService) {
         this.qualityOfService = qualityOfService;
     }
 
+    /**
+     * @return
+     */
     public List<String> getTopics() {
         return topics;
     }
 
+    /**
+     * @param topics The topics to subscribe to, if many.
+     */
     public void setTopics(List<String> topics) {
         this.topics = topics;
     }
 
+    /**
+     * @return
+     */
     public List<Integer> getQualitiesOfService() {
         return qualitiesOfService;
     }
 
+    /**
+     * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
+     * If specified, the list must contain the same number of elements as {@link #topics}.
+     */
     public void setQualitiesOfService(List<Integer> qualitiesOfService) {
         this.qualitiesOfService = qualitiesOfService;
     }
 
+    /**
+     * @return
+     */
     public String getClientId() {
         return clientId;
     }
 
+    /**
+     * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+     * it througout any reconnection attempts.
+     */
     public void setClientId(String clientId) {
         this.clientId = clientId;
     }
 
+    /**
+     * @return
+     */
     public MqttClientPersistence getPersistence() {
         return persistence;
     }
 
+    /**
+     * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+     */
     public void setPersistence(MqttClientPersistence persistence) {
         this.persistence = persistence;
     }
 
+    /**
+     * @return
+     */
     public MqttConnectOptions getConnectOptions() {
         return connectOptions;
     }
 
+    /**
+     * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+     */
     public void setConnectOptions(MqttConnectOptions connectOptions) {
         this.connectOptions = connectOptions;
     }
 
+    /**
+     * @return
+     */
     public boolean isDisconnectForcibly() {
         return disconnectForcibly;
     }
 
+    /**
+     * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+     */
     public void setDisconnectForcibly(boolean disconnectForcibly) {
         this.disconnectForcibly = disconnectForcibly;
     }
 
+    /**
+     * @return
+     */
     public Integer getDisconnectQuiesceTimeout() {
         return disconnectQuiesceTimeout;
     }
 
+    /**
+     * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+     */
     public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
         this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
     }
 
+    /**
+     * @return
+     */
     public Integer getDisconnectForciblyTimeout() {
         return disconnectForciblyTimeout;
     }
 
+    /**
+     * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+     */
     public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
         this.disconnectForciblyTimeout = disconnectForciblyTimeout;
     }
 
+    /**
+     * @return
+     */
     public WaitStrategy getRetryWaitStrategy() {
         return retryWaitStrategy;
     }
 
+    /**
+     * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
+     * By default, this streamer uses a Fibonacci-based strategy.
+     */
     public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
         this.retryWaitStrategy = retryWaitStrategy;
     }
 
+    /**
+     * @return
+     */
     public StopStrategy getRetryStopStrategy() {
         return retryStopStrategy;
     }
 
+    /**
+     * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+     */
     public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
         this.retryStopStrategy = retryStopStrategy;
     }
 
+    /**
+     * @return
+     */
     public boolean isBlockUntilConnected() {
         return blockUntilConnected;
     }
 
+    /**
+     * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
+     * false.
+     */
     public void setBlockUntilConnected(boolean blockUntilConnected) {
         this.blockUntilConnected = blockUntilConnected;
     }
 
+    /**
+     * A utility class to help us with (re-)connecting to the MQTT broker. It uses a single-threaded executor to perform
+     * the (re-)connections.
+     */
     private class MqttConnectionRetrier {
 
+        /** The guava-retrying retrier object. */
         private final Retryer<Boolean> retrier;
+
+        /** Single-threaded pool. */
         private ExecutorService executor = Executors.newSingleThreadExecutor();
 
+        /**
+         * Constructor.
+         * @param retrier The retryier object.
+         */
         public MqttConnectionRetrier(Retryer<Boolean> retrier) {
             this.retrier = retrier;
         }
 
+        /**
+         * Method that is called by the streamer to ask us to (re-)connect.
+         */
         public void connect() {
             Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
                 @Override public Boolean call() throws Exception {
@@ -460,6 +599,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             }
         }
 
+        /**
+         * Stops this connection utility class by shutting down the thread pool.
+         */
         public void stop() {
             executor.shutdownNow();
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 012486a..5ac7339 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -65,24 +65,41 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
  */
 public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
+    /** The test data. */
     private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+    /** Topic name for single topic tests. */
     private static final String SINGLE_TOPIC_NAME = "abc";
+
+    /** Topic names for multiple topic tests. */
     private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
 
+    /** The AMQ broker with an MQTT interface. */
     private BrokerService broker;
+
+    /** The MQTT client. */
     private MqttClient client;
+
+    /** The broker URL. */
     private String brokerUrl;
+
+    /** The broker port. **/
     private int port;
+
+    /** The MQTT streamer currently under test. */
     private MqttStreamer<Integer, String> streamer;
+
+    /** The UUID of the currently active remote listener. */
     private UUID remoteListener;
 
+    /** The Ignite data streamer. */
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
     static {
         for (int i = 0; i < 100; i++)
             TEST_DATA.put(i, "v" + i);
     }
 
-    private IgniteDataStreamer<Integer, String> dataStreamer;
-
     /** Constructor. */
     public IgniteMqttStreamerTest() {
         super(true);
@@ -99,14 +116,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
         // create the broker
         broker = new BrokerService();
-        broker.deleteAllMessages();
+        broker.setDeleteAllMessagesOnStartup(true);
         broker.setPersistent(false);
+        broker.setPersistenceAdapter(null);
+        broker.setPersistenceFactory(null);
 
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry policy = new PolicyEntry();
         policy.setQueuePrefetch(1);
         broker.setDestinationPolicy(policyMap);
         broker.getDestinationPolicy().setDefaultEntry(policy);
+        broker.setSchedulerSupport(false);
 
         // add the MQTT transport connector to the broker
         broker.addConnector("mqtt://localhost:" + port);
@@ -143,6 +163,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -162,6 +185,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(50);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -185,6 +211,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
         streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -204,6 +233,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(50);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
         // configure streamer
         streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -227,6 +259,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -265,6 +300,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(100);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_Reconnect() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -306,6 +344,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertCacheEntriesLoaded(100);
     }
 
+    /**
+     * @throws Exception
+     */
     public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -339,6 +380,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -363,6 +407,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
     }
 
+    /**
+     * @throws Exception
+     */
     public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
         // configure streamer
         streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -379,6 +426,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
     }
 
+    /**
+     * @throws Exception
+     */
     private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
         MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
         streamer.setIgnite(grid());
@@ -393,7 +443,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return streamer;
     }
 
-    public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+    /**
+     * @throws Exception
+     */
+    private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
         if (singleMessage) {
             final List<StringBuilder> sbs = new ArrayList<>(topics.size());
             // initialize String Builders for each topic
@@ -423,6 +476,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception
+     */
     private CountDownLatch subscribeToPutEvents(int expect) {
         Ignite ignite = grid();
 
@@ -439,14 +495,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         return latch;
     }
 
+    /**
+     * @throws Exception
+     */
     private void assertCacheEntriesLoaded(int count) {
         // get the cache and check that the entries are present
         IgniteCache<Integer, String> cache = grid().cache(null);
 
         // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
-        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
             assertEquals(TEST_DATA.get(key), cache.get(key));
-        }
 
         // assert that the cache exactly the specified amount of elements
         assertEquals(count, cache.size(CachePeekMode.ALL));
@@ -455,6 +513,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
     }
 
+    /**
+     * Returns a {@link StreamSingleTupleExtractor} for testing.
+     *
+     * @throws Exception
+     */
     public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
         return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
@@ -464,6 +527,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
         };
     }
 
+    /**
+     * Returns a {@link StreamMultipleTupleExtractor} for testing.
+     *
+     * @throws Exception
+     */
     public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
         return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
             @Override public Map<Integer, String> extract(MqttMessage msg) {


[18/23] ignite git commit: Check for WeakValue in GridH2AbstractKeyValueRow.onUnswap

Posted by vo...@apache.org.
Check for WeakValue in GridH2AbstractKeyValueRow.onUnswap


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca2bce00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca2bce00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca2bce00

Branch: refs/heads/ignite-1513-final
Commit: ca2bce00516142a1204fb9226c938174047e72d6
Parents: 72c3eef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 15:04:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 15:04:27 2015 +0300

----------------------------------------------------------------------
 .../processors/query/h2/opt/GridH2AbstractKeyValueRow.java       | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca2bce00/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index c11f541..ca5442a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,7 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
      * @throws IgniteCheckedException If failed.
      */
     public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
-        if (peekValue(VAL_COL) != null)
+        Value val0 = peekValue(VAL_COL);
+
+        if (val0 != null && !(val0 instanceof WeakValue))
             return;
 
         setValue(VAL_COL, desc.wrap(val, desc.valueType()));


[06/23] ignite git commit: IGNITE-1370 Refactor StreamTupleExtractor API for 0..1 and 0..n extraction.

Posted by vo...@apache.org.
IGNITE-1370 Refactor StreamTupleExtractor API for 0..1 and 0..n extraction.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/421a5234
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/421a5234
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/421a5234

Branch: refs/heads/ignite-1513-final
Commit: 421a5234b5a7e56e36952a4c1976b3118310073e
Parents: eae4df1 b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:28:12 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:28:12 2015 +0100

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |   5 +-
 .../org/apache/ignite/stream/StreamAdapter.java | 104 +++++++++++++++--
 .../stream/StreamMultipleTupleExtractor.java    |  38 +++++++
 .../stream/StreamSingleTupleExtractor.java      |  40 +++++++
 .../ignite/stream/StreamTupleExtractor.java     |  20 ++--
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 112 ++++++++++++++-----
 7 files changed, 270 insertions(+), 52 deletions(-)
----------------------------------------------------------------------



[14/23] ignite git commit: Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4

Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1040872
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1040872
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1040872

Branch: refs/heads/ignite-1513-final
Commit: a1040872f37cf4fd1dc20584c68307f420d0d3af
Parents: 33fe30d 50f75bd
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:59:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:59:14 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java    | 17 +++++------------
 .../processors/cache/GridCacheProcessor.java       |  2 +-
 2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------



[04/23] ignite git commit: IGNITE-535 (WIP) Implement MQTT Streamer.

Posted by vo...@apache.org.
IGNITE-535 (WIP) Implement MQTT Streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b53f1bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b53f1bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b53f1bb

Branch: refs/heads/ignite-1513-final
Commit: 6b53f1bb2699ae7a6ea8ae277de1bfc3ecbd7b6a
Parents: b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:45:58 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 22:30:09 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            | 110 +++++++++
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 243 +++++++++++++++++++
 .../stream/mqtt/IgniteMqttStreamerTest.java     |  50 ++++
 .../mqtt/IgniteMqttStreamerTestSuite.java       |  34 +++
 .../ignite/stream/mqtt/TestTupleExtractors.java |  28 +++
 pom.xml                                         |   1 +
 6 files changed, 466 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
new file mode 100644
index 0000000..b108180
--- /dev/null
+++ b/modules/mqtt/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <artifactId>ignite-mqtt</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <properties>
+        <paho.version>1.0.2</paho.version>
+        <mosquette.version>0.7</mosquette.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>${paho.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.eclipse.moquette</groupId>
+            <artifactId>moquette-broker</artifactId>
+            <version>${mosquette.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
+    <repositories>
+        <repository>
+            <id>bintray</id>
+            <url>http://dl.bintray.com/andsel/maven/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+        <repository>
+            <id>Eclipse Paho Repo</id>
+            <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+
+</project>

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
new file mode 100644
index 0000000..00a89ab
--- /dev/null
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * Streamer that consumes from a MQTT topic and feeds key-value pairs into an {@link IgniteDataStreamer} instance,
+ * using Eclipse Paho as an MQTT client.
+ * <p>
+ * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract
+ * cache tuples out of the incoming message.
+ * <p>
+ * This Streamer has many features:
+ *
+ * <ul>
+ *     <li>Subscribing to a single topic or multiple topics at once.</li>
+ *     <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
+ *     <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
+ *         sessions, etc.</li>
+ *     <li>Specifying the client ID.</li>
+ * </ul>
+ *
+ * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * {@link #connectOptions} property.
+ *
+ * @author Raul Kripalani
+ */
+public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    private MqttClient client;
+
+    private String brokerUrl;
+
+    private String topic;
+
+    private Integer qualityOfService;
+
+    private List<String> topics;
+
+    private List<Integer> qualitiesOfService;
+
+    /** Client ID in case we're using durable subscribers. */
+    private String clientId;
+
+    private MqttClientPersistence persistence;
+
+    private MqttConnectOptions connectOptions;
+
+    // disconnect parameters
+    private Integer disconnectQuiesceTimeout;
+
+    private boolean disconnectForcibly;
+
+    private Integer disconnectForciblyTimeout;
+
+    private volatile boolean stopped = true;
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() throws IgniteException {
+        if (!stopped)
+            throw new IgniteException("Attempted to start an already started MQTT Streamer");
+
+        // for simplicity, if these are null initialize to empty lists
+        topics = topics == null ? new ArrayList<String>() : topics;
+        qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
+
+        try {
+            // parameter validations
+            A.notNull(getStreamer(), "streamer");
+            A.notNull(getIgnite(), "ignite");
+            A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+            A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
+                "both single and multiple tuple extractor");
+            A.notNullOrEmpty(brokerUrl, "broker URL");
+            A.notNullOrEmpty(clientId, "client ID");
+
+            // if we have both a single topic and a list of topics, fail
+            if (topic != null && topic.length() > 0 && !topics.isEmpty())
+                throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
+
+            // if we have both a single QoS and list, fail
+            if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+                throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
+            }
+
+            // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
+            if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+                A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
+                    "with quiesce");
+            }
+
+            // if we have multiple topics
+            if (topics != null && !topics.isEmpty()) {
+                for (String t : topics) {
+                    A.notNullOrEmpty(t, "topic in list of topics");
+                }
+                A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
+                    "service must be either empty or have the same size as topics list");
+            }
+
+            // create logger
+            log = getIgnite().log();
+
+            // create the mqtt client
+            if (persistence == null)
+                client = new MqttClient(brokerUrl, clientId);
+            else
+                client = new MqttClient(brokerUrl, clientId, persistence);
+
+            connectAndSubscribe();
+
+            stopped = false;
+
+        }
+        catch (Throwable t) {
+            throw new IgniteException("Exception while initializing MqttStreamer", t);
+        }
+
+    }
+
+    private void connectAndSubscribe() throws MqttException {
+        // connect
+        if (connectOptions != null)
+            client.connect();
+        else
+            client.connect(connectOptions);
+
+        // subscribe to multiple topics
+        if (!topics.isEmpty()) {
+            if (qualitiesOfService.isEmpty()) {
+                client.subscribe(topics.toArray(new String[0]));
+            } else {
+                int[] qoses = new int[qualitiesOfService.size()];
+                for (int i = 0; i < qualitiesOfService.size(); i++)
+                    qoses[i] = qualitiesOfService.get(i);
+
+                client.subscribe(topics.toArray(new String[0]), qoses);
+            }
+        } else {
+            // subscribe to a single topic
+            if (qualityOfService == null) {
+                client.subscribe(topic);
+            } else {
+                client.subscribe(topic, qualityOfService);
+            }
+        }
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() throws IgniteException {
+        if (stopped)
+            throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+
+        try {
+            if (disconnectForcibly) {
+                if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+                    client.disconnectForcibly();
+                } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+                    client.disconnectForcibly(disconnectForciblyTimeout);
+                } else {
+                    client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
+                }
+            } else {
+                if (disconnectQuiesceTimeout == null) {
+                    client.disconnect();
+                } else {
+                    client.disconnect(disconnectQuiesceTimeout);
+                }
+            }
+        }
+        catch (Throwable t) {
+            throw new IgniteException("Exception while stopping MqttStreamer", t);
+        }
+    }
+
+    @Override public void connectionLost(Throwable throwable) {
+        log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
+        // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
+        try {
+            connectAndSubscribe();
+        }
+        catch (MqttException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
+        if (getMultipleTupleExtractor() != null) {
+            Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+            getStreamer().addData(entries);
+        } else {
+            Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+            getStreamer().addData(entry);
+        }
+    }
+
+    @Override public void deliveryComplete(IMqttDeliveryToken token) {
+        // ignore, we don't send messages
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
new file mode 100644
index 0000000..59730fa
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test for {@link MqttStreamer}.
+ *
+ * @author Raul Kripalani
+ */
+public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+
+    /** Constructor. */
+    public IgniteMqttStreamerTest() {
+        super(true);
+    }
+
+    @Before @SuppressWarnings("unchecked")
+    public void beforeTest() throws Exception {
+        grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+    }
+
+    @After
+    public void afterTest() throws Exception {
+        grid().cache(null).clear();
+
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
new file mode 100644
index 0000000..ff25145
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * MQTT streamer tests.
+ *
+ * @author Raul Kripalani
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    IgniteMqttStreamerTest.class
+})
+public class IgniteMqttStreamerTestSuite {
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
new file mode 100644
index 0000000..e2ed0f0
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+/**
+ * Test transformers for MqttStreamer tests.
+ *
+ * @author Raul Kripalani
+ */
+public class TestTupleExtractors {
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b47958f..c19a9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
         <module>modules/kafka</module>
         <module>modules/yarn</module>
         <module>modules/jms11</module>
+        <module>modules/mqtt</module>
         <module>modules/zookeeper</module>
         <module>modules/platform</module>
     </modules>


[12/23] ignite git commit: Cleaned documentation. Set ATOMIC mode as default using specific constant.

Posted by vo...@apache.org.
Cleaned documentation. Set ATOMIC mode as default using specific constant.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f75bd6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f75bd6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f75bd6

Branch: refs/heads/ignite-1513-final
Commit: 50f75bd6111b5b9163391e4c0913ff5b696a2862
Parents: e51fb42
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Sep 22 11:44:22 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Sep 22 11:44:22 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/cache/CacheAtomicityMode.java    | 17 +++++------------
 .../processors/cache/GridCacheProcessor.java       |  2 +-
 2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
index 9e0f81e..92b5aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
@@ -33,11 +33,6 @@ public enum CacheAtomicityMode {
     /**
      * Specified fully {@code ACID}-compliant transactional cache behavior. See
      * {@link Transaction} for more information about transactions.
-     * <p>
-     * This mode is currently the default cache atomicity mode. However, cache
-     * atomicity mode will be changed to {@link #ATOMIC} starting from version {@code 5.2},
-     * so it is recommended that desired atomicity mode is explicitly configured
-     * instead of relying on default value.
      */
     TRANSACTIONAL,
 
@@ -49,18 +44,16 @@ public enum CacheAtomicityMode {
      * In addition to transactions and locking, one of the main differences in {@code ATOMIC} mode
      * is that bulk writes, such as {@code putAll(...)}, {@code removeAll(...)}, and {@code transformAll(...)}
      * methods, become simple batch operations which can partially fail. In case of partial
-     * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown which will contain a list of keys
-     * for which the update failed. It is recommended that bulk writes are used whenever multiple keys
-     * need to be inserted or updated in cache, as they reduce number of network trips and provide
-     * better performance.
+     * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown
+     * which will contain a list of keys for which the update failed. It is recommended that bulk writes are used
+     * whenever multiple keys need to be inserted or updated in cache, as they reduce number of network trips and
+     * provide better performance.
      * <p>
      * Note that even without locking and transactions, {@code ATOMIC} mode still provides
      * full consistency guarantees across all cache nodes.
      * <p>
      * Also note that all data modifications in {@code ATOMIC} mode are guaranteed to be atomic
      * and consistent with writes to the underlying persistent store, if one is configured.
-     * <p>
-     * This mode is currently implemented for {@link CacheMode#PARTITIONED} caches only.
      */
     ATOMIC;
 
@@ -76,4 +69,4 @@ public enum CacheAtomicityMode {
     @Nullable public static CacheAtomicityMode fromOrdinal(int ord) {
         return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7c16136..9c325aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -271,7 +271,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
             cfg.setRebalanceMode(ASYNC);
 
         if (cfg.getAtomicityMode() == null)
-            cfg.setAtomicityMode(ATOMIC);
+            cfg.setAtomicityMode(CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE);
 
         if (cfg.getWriteSynchronizationMode() == null)
             cfg.setWriteSynchronizationMode(PRIMARY_SYNC);


[13/23] ignite git commit: Added test.

Posted by vo...@apache.org.
Added test.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33fe30da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33fe30da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33fe30da

Branch: refs/heads/ignite-1513-final
Commit: 33fe30da620e4f08cee959104805f3527b597700
Parents: e51fb42
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:55:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:55:18 2015 +0300

----------------------------------------------------------------------
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 2 files changed, 119 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 66275b3..14417c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,12 +21,25 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
 /**
  * Client-based discovery SPI test with failure detection timeout enabled.
  */
@@ -60,7 +73,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
 
     /** {@inheritDoc} */
     @Override protected TcpDiscoverySpi getDiscoverySpi() {
-        return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+        return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
     }
 
     /**
@@ -117,16 +130,16 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     private void checkFailureThresholdWorkability() throws Exception {
         useTestSpi = true;
 
-        TestTcpDiscoverySpi firstSpi = null;
-        TestTcpDiscoverySpi secondSpi = null;
+        TestTcpDiscoverySpi2 firstSpi = null;
+        TestTcpDiscoverySpi2 secondSpi = null;
 
         try {
             startServerNodes(2);
 
             checkNodes(2, 0);
 
-            firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
-            secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+            firstSpi = (TestTcpDiscoverySpi2)(G.ignite("server-0").configuration().getDiscoverySpi());
+            secondSpi = (TestTcpDiscoverySpi2)(G.ignite("server-1").configuration().getDiscoverySpi());
 
             assert firstSpi.err == null;
 
@@ -157,9 +170,102 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
     }
 
     /**
+     * Test tries to provoke scenario when client sends reconnect message before router failure detected.
+     *
+     * @throws Exception If failed.
+     */
+    public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+        startServerNodes(1);
+
+        Ignite srv = G.ignite("server-0");
+
+        final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+        final UUID srvNodeId = srvNode.id();
+
+        clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+        clientIpFinder.setAddresses(
+            Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
+
+        failureThreshold = 1000L;
+        netTimeout = 500L;
+
+        startClientNodes(1); // Client should connect to coordinator.
+
+        failureThreshold = 10_000L;
+        netTimeout = 5000L;
+
+        for (int i = 0; i < 2; i++) {
+            Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+            srvNodeIds.add(g.cluster().localNode().id());
+        }
+
+        checkNodes(3, 1);
+
+        final CountDownLatch latch = new CountDownLatch(3);
+
+        String nodes[] = {"server-1", "server-2", "client-0"};
+
+        final AtomicBoolean err = new AtomicBoolean();
+
+        for (String node : nodes) {
+            G.ignite(node).events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event evt) {
+                    DiscoveryEvent disoEvt = (DiscoveryEvent)evt;
+
+                    if (disoEvt.eventNode().id().equals(srvNodeId)) {
+                        info("Expected node failed event: " + ((DiscoveryEvent) evt).eventNode());
+
+                        latch.countDown();
+                    }
+                    else {
+                        log.info("Unexpected node failed event: " + evt);
+
+                        err.set(true);
+                    }
+
+                    return true;
+                }
+            }, EVT_NODE_FAILED);
+        }
+
+        Thread.sleep(5000);
+
+        Ignite client = G.ignite("client-0");
+
+        UUID nodeId = client.cluster().localNode().id();
+
+        log.info("Fail coordinator: " + srvNodeId);
+
+        TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi();
+
+        srvSpi.pauseAll(false);
+
+        try {
+            Thread.sleep(2000);
+        }
+        finally {
+            srvSpi.simulateNodeFailure();
+            srvSpi.resumeAll();
+        }
+
+        try {
+            assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+
+            assertFalse("Unexpected event, see log for details.", err.get());
+            assertEquals(nodeId, client.cluster().localNode().id());
+        }
+        finally {
+            srvSpi.resumeAll();
+        }
+    }
+
+    /**
      *
      */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi {
         /** */
         private long readDelay;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index c86f06a..9fbf5b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -89,13 +89,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final AtomicInteger srvIdx = new AtomicInteger();
+    protected static final AtomicInteger srvIdx = new AtomicInteger();
 
     /** */
     private static final AtomicInteger clientIdx = new AtomicInteger();
 
     /** */
-    private static Collection<UUID> srvNodeIds;
+    protected static Collection<UUID> srvNodeIds;
 
     /** */
     private static Collection<UUID> clientNodeIds;
@@ -128,13 +128,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     private UUID nodeId;
 
     /** */
-    private TcpDiscoveryVmIpFinder clientIpFinder;
+    protected TcpDiscoveryVmIpFinder clientIpFinder;
 
     /** */
     private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
 
     /** */
-    private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+    protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
 
     /** */
     private boolean longSockTimeouts;
@@ -466,7 +466,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
             @Override public void apply(Socket sock) {
                 try {
                     latch.await();
-                } catch (InterruptedException e) {
+                }
+                catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
             }
@@ -2056,7 +2057,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
     /**
      *
      */
-    private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+    protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
         /** */
         private final Object mux = new Object();
 


[11/23] ignite git commit: IGNITE-1522 - Made cache entry listener configurations transient in cache configuration

Posted by vo...@apache.org.
IGNITE-1522 - Made cache entry listener configurations transient in cache configuration


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e51fb420
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e51fb420
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e51fb420

Branch: refs/heads/ignite-1513-final
Commit: e51fb420d1284465c7cbe55a28c2374ddf67d495
Parents: 621eb0f
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Mon Sep 21 23:29:20 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Sep 21 23:29:20 2015 -0700

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 15 +++++
 .../IgniteCacheEntryListenerAbstractTest.java   | 65 +++++++++++++++++++-
 2 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 7d1e14d..44a3fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
 
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashSet;
 import javax.cache.Cache;
 import javax.cache.configuration.CompleteConfiguration;
 import javax.cache.configuration.Factory;
@@ -1799,6 +1800,20 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
         return this;
     }
 
+    /**
+     * Creates a copy of current configuration and removes all cache entry listeners.
+     * They are executed only locally and should never be sent to remote nodes.
+     *
+     * @return Configuration object that will be serialized.
+     */
+    protected Object writeReplace() {
+        CacheConfiguration<K, V> cfg = new CacheConfiguration<>(this);
+
+        cfg.listenerConfigurations = new HashSet<>();
+
+        return cfg;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheConfiguration.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 78a6700..3fdd7fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -32,11 +36,13 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.configuration.CacheEntryListenerConfiguration;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
 import javax.cache.event.CacheEntryCreatedListener;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryExpiredListener;
 import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryRemovedListener;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -358,6 +364,34 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testSerialization() throws Exception {
+        if (cacheMode() == LOCAL)
+            return;
+
+        AtomicBoolean serialized = new AtomicBoolean();
+
+        NonSerializableListener lsnr = new NonSerializableListener(serialized);
+
+        jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
+            FactoryBuilder.factoryOf(lsnr),
+            null,
+            true,
+            false
+        ));
+
+        try {
+            startGrid(gridCount());
+        }
+        finally {
+            stopGrid(gridCount());
+        }
+
+        assertFalse(serialized.get());
+    }
+
+    /**
      * @param key Key.
      * @param val Value.
      * @param cache Cache.
@@ -1190,4 +1224,33 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
         }
     }
 
-}
\ No newline at end of file
+    /**
+     */
+    public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
+        /** */
+        private final AtomicBoolean serialized;
+
+        /**
+         * @param serialized Serialized flag.
+         */
+        public NonSerializableListener(AtomicBoolean serialized) {
+            this.serialized = serialized;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCreated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> evts)
+            throws CacheEntryListenerException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            serialized.set(true);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            // No-op.
+        }
+    }
+}


[10/23] ignite git commit: ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing

Posted by vo...@apache.org.
ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7

Branch: refs/heads/ignite-1513-final
Commit: 621eb0f75bbe1a0a623229dded38a3549309eead
Parents: 8b94494
Author: sboikov <se...@inria.fr>
Authored: Mon Sep 21 21:37:52 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon Sep 21 21:37:52 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 37 +++++++++++++-------
 .../processors/cache/GridCacheProcessor.java    |  2 +-
 .../processors/cache/GridCacheSwapManager.java  | 24 ++++++-------
 .../datastreamer/DataStreamerImpl.java          |  2 --
 4 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f2bb646..961c792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 boolean hasValPtr = hasOffHeapPointer();
 
+                if (old == null)
+                    old = saveValueForIndexUnlocked();
+
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 clearIndex(old);
@@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     // Must persist inside synchronization in non-tx mode.
                     cctx.store().remove(null, keyValue(false));
 
+                if (oldVal == null)
+                    oldVal = saveValueForIndexUnlocked();
+
                 // Update index inside synchronization since it can be updated
                 // in load methods without actually holding entry lock.
                 clearIndex(oldVal);
@@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
         try {
             synchronized (this) {
-                CacheObject expiredVal = saveValueForIndexUnlocked();
+                CacheObject expiredVal = saveOldValueUnlocked(false);
 
                 boolean hasOldBytes = hasOffHeapPointer();
 
@@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            if (qryMgr != null && qryMgr.enabled()) {
-                qryMgr.store(key,
-                    val,
-                    ver,
-                    expireTime);
-            }
+            if (qryMgr.enabled())
+                qryMgr.store(key, val, ver, expireTime);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
         try {
             GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
 
-            if (qryMgr != null)
-                qryMgr.remove(key(), prevVal == null ? null : prevVal);
+            if (qryMgr.enabled())
+                qryMgr.remove(key(), prevVal);
         }
         catch (IgniteCheckedException e) {
             throw new GridCacheIndexUpdateException(e);
@@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
      * @return Previous value or {@code null}.
      * @throws IgniteCheckedException If failed to retrieve previous value.
      */
-    protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+    protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+        return saveOldValueUnlocked(true);
+    }
+
+    /**
+     * @param qryOnly If {@code true} reads old value only if query indexing is enabled.
+     * @return Previous value or {@code null}.
+     * @throws IgniteCheckedException If failed to retrieve previous value.
+     */
+    private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException {
         assert Thread.holdsLock(this);
 
-        if (cctx.queries() == null)
+        if (qryOnly && !cctx.queries().enabled())
             return null;
 
         CacheObject val = rawGetOrUnmarshalUnlocked(false);
@@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
                     if (obsoleteVersionExtras() != null)
                         return true;
 
-                    CacheObject prev = saveValueForIndexUnlocked();
+                    CacheObject prev = saveOldValueUnlocked(false);
 
                     if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
                         if (swap) {
@@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
 
                 GridCacheQueryManager qryMgr = cctx.queries();
 
-                if (qryMgr != null)
+                if (qryMgr.enabled())
                     qryMgr.onUnswap(key, prevVal);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c92de7d..7c16136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
         if (spaceName.equals(CU.swapSpaceName(cctx))) {
             GridCacheQueryManager qryMgr = cctx.queries();
 
-            if (qryMgr != null) {
+            if (qryMgr.enabled()) {
                 try {
                     KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 9b6381e..d9a8b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         final GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+        if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
             return null; // Not found.
 
         swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
             @Override public void apply(byte[] rmv) {
-                if (qryMgr == null && cctx.config().isStatisticsEnabled())
+                if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onSwapRead(rmv != null);
 
                 if (rmv != null) {
@@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheSwapEntry entry;
 
-        if (qryMgr != null) {
+        if (qryMgr.enabled()) {
             entry = readOffheapBeforeRemove(key, keyBytes, part);
 
             if (entry != null) {
@@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         ClassLoader ldr = cctx.deploy().globalLoader();
 
-        if (qryMgr != null) { // Unswap for indexing.
+        if (qryMgr.enabled()) { // Unswap for indexing.
             Iterator<SwapKey> iter = unprocessedKeys.iterator();
 
             while (iter.hasNext()) {
@@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
             unprocessedKeys,
             new IgniteBiInClosure<SwapKey, byte[]>() {
                 @Override public void apply(SwapKey swapKey, byte[] rmv) {
-                    if (qryMgr == null && cctx.config().isStatisticsEnabled())
+                    if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
                         cctx.cache().metrics0().onSwapRead(rmv != null);
 
                     if (rmv != null) {
@@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
         throws IgniteCheckedException {
-        assert cctx.queries() != null;
+        assert cctx.queries().enabled();
 
         byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes);
 
@@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
      */
     private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
         throws IgniteCheckedException {
-        assert cctx.queries() != null;
+        assert cctx.queries().enabled();
 
         byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
 
@@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
         if (offheapEnabled) {
             byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
 
-            if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+            if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
                 offheap.removex(spaceName, part, key, keyBytes)) {
                 if (cctx.config().isStatisticsEnabled())
                     cctx.cache().metrics0().onOffHeapRemove();
@@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
             ClassLoader ldr = cctx.deploy().globalLoader();
 
-            if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+            if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
                 return; // Not found.
 
             swapMgr.remove(spaceName,
@@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
 
         GridCacheQueryManager qryMgr = cctx.queries();
 
-        if (qryMgr != null)
+        if (qryMgr.enabled())
             qryMgr.onSwap(key);
     }
 
@@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
 
-                if (qryMgr != null)
+                if (qryMgr.enabled())
                     qryMgr.onSwap(swapEntry.key());
             }
         }
@@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
                     cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(),
                         (IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
 
-                    if (qryMgr != null)
+                    if (qryMgr.enabled())
                         qryMgr.onSwap(batchSwapEntry.key());
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b5d9a7d..ab2a6e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
                     GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
 
-                    entry.unswap(false);
-
                     if (plc != null) {
                         ttl = CU.toTtl(plc.getExpiryForCreation());
 


[02/23] ignite git commit: IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.

Posted by vo...@apache.org.
IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1dee96e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1dee96e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1dee96e

Branch: refs/heads/ignite-1513-final
Commit: b1dee96e0d2b6130959400615b29b02721075392
Parents: 4d9734a
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:48:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 00:48:46 2015 +0100

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |  5 +-
 .../org/apache/ignite/stream/StreamAdapter.java | 64 ++++++++++++++++----
 .../stream/StreamSingleTupleExtractor.java      | 40 ++++++++++++
 .../ignite/stream/StreamTupleExtractor.java     | 23 +++----
 .../ignite/stream/socket/SocketStreamer.java    |  2 +-
 .../stream/socket/SocketStreamerSelfTest.java   |  6 +-
 6 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 8e961b9..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.examples.streaming.wordcount.socket;
 import java.io.UnsupportedEncodingException;
 import java.net.InetAddress;
 import java.util.Map;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteDataStreamer;
@@ -31,7 +32,7 @@ import org.apache.ignite.examples.ExamplesUtils;
 import org.apache.ignite.examples.streaming.wordcount.CacheConfig;
 import org.apache.ignite.examples.streaming.wordcount.QueryWords;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.stream.StreamTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.socket.SocketMessageConverter;
 import org.apache.ignite.stream.socket.SocketStreamer;
 
@@ -108,7 +109,7 @@ public class WordsSocketStreamerServer {
             }
         });
 
-        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+        sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<String, AffinityUuid, String>() {
             @Override public Map.Entry<AffinityUuid, String> extract(String word) {
                 // By using AffinityUuid we ensure that identical
                 // words are processed on the same cluster node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index ffa0821..e7d224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,8 +37,9 @@ import org.apache.ignite.IgniteDataStreamer;
  * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
-    /** Tuple extractor. */
-    private StreamTupleExtractor<T, K, V> extractor;
+
+    /** Tuple extractor extracting a single tuple from an event */
+    private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
 
     /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
     private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
@@ -60,11 +61,22 @@ public abstract class StreamAdapter<T, K, V> {
      * Stream adapter.
      *
      * @param stmr Streamer.
-     * @param extractor Tuple extractor.
+     * @param extractor Tuple extractor (1:1).
      */
-    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamSingleTupleExtractor<T, K, V> extractor) {
         this.stmr = stmr;
-        this.extractor = extractor;
+        this.singleTupleExtractor = extractor;
+    }
+
+    /**
+     * Stream adapter.
+     *
+     * @param stmr Streamer.
+     * @param extractor Tuple extractor (1:n).
+     */
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamMultipleTupleExtractor<T, K, V> extractor) {
+        this.stmr = stmr;
+        this.multipleTupleExtractor = extractor;
     }
 
     /**
@@ -83,16 +95,44 @@ public abstract class StreamAdapter<T, K, V> {
 
     /**
      * @return Provided tuple extractor.
+     * @see #getSingleTupleExtractor()
      */
+    @Deprecated
     public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        return extractor;
+        if (singleTupleExtractor instanceof StreamTupleExtractor) {
+            return (StreamTupleExtractor) singleTupleExtractor;
+        }
+        throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
+            "StreamTupleExtractor; use getSingleTupleExtractor instead");
     }
 
     /**
-     * @param extractor Extractor for key-value tuples from messages.
+     * @param extractor Extractor for a single key-value tuple from the message.
+     * @see #setSingleTupleExtractor(StreamSingleTupleExtractor)
      */
+    @Deprecated
     public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        this.extractor = extractor;
+        if (multipleTupleExtractor != null) {
+            throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+        }
+        this.singleTupleExtractor = extractor;
+    }
+
+    /**
+     * @return Provided single tuple extractor.
+     */
+    public StreamSingleTupleExtractor<T, K, V> getSingleTupleExtractor() {
+        return singleTupleExtractor;
+    }
+
+    /**
+     * @param singleTupleExtractor Extractor for key-value tuples from messages.
+     */
+    public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
+        if (multipleTupleExtractor != null) {
+            throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+        }
+        this.singleTupleExtractor = singleTupleExtractor;
     }
 
     /**
@@ -106,6 +146,9 @@ public abstract class StreamAdapter<T, K, V> {
      * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
      */
     public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+        if (singleTupleExtractor != null) {
+            throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
+        }
         this.multipleTupleExtractor = multipleTupleExtractor;
     }
 
@@ -126,15 +169,12 @@ public abstract class StreamAdapter<T, K, V> {
     /**
      * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
      * underlying streamer.
-     * <p>
-     * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
-     * and the latter will be ignored.
      *
      * @param msg Message to convert.
      */
     protected void addMessage(T msg) {
         if (multipleTupleExtractor == null) {
-            Map.Entry<K, V> e = extractor.extract(msg);
+            Map.Entry<K, V> e = singleTupleExtractor.extract(msg);
 
             if (e != null)
                 stmr.addData(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
new file mode 100644
index 0000000..fd50e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a message to a single Ignite key-value tuple.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be used in cases where a single message/event may
+ * produce more than one tuple.
+ * <p>
+ * NOTE: This interface supersedes the former {@link StreamTupleExtractor} which is now deprecated.
+ *
+ * @see StreamMultipleTupleExtractor
+ */
+public interface StreamSingleTupleExtractor<T, K, V> {
+    /**
+     * Extracts a key-value tuple from a message.
+     *
+     * @param msg Message.
+     * @return Key-value tuple.
+     */
+    public Map.Entry<K, V> extract(T msg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index aed7d8a..5cd8d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -17,22 +17,13 @@
 
 package org.apache.ignite.stream;
 
-import java.util.Map;
-
 /**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- * <p>
- * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
- * produce more than one tuple.
+ * This interface is provided for backwards compatibility with {@link StreamSingleTupleExtractor}.
  *
- * @see StreamMultipleTupleExtractor
+ * @deprecated Will be removed in 2.0.0.
+ * @see StreamSingleTupleExtractor
  */
-public interface StreamTupleExtractor<T, K, V> {
-    /**
-     * Extracts a key-value tuple from a message.
-     *
-     * @param msg Message.
-     * @return Key-value tuple.
-     */
-    public Map.Entry<K, V> extract(T msg);
-}
\ No newline at end of file
+@Deprecated
+public interface StreamTupleExtractor<T, K, V> extends StreamSingleTupleExtractor<T, K, V> {
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index c89952d..066a5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+        A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
             "tupleExtractor (single or multiple)");
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 8b05754..6786b7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -44,9 +45,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -292,7 +294,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
             sockStmr.setDelimiter(delim);
 
             if (oneMessagePerTuple) {
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+                sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Message, Integer, String>() {
                     @Override public Map.Entry<Integer, String> extract(Message msg) {
                         return new IgniteBiTuple<>(msg.key, msg.val);
                     }


[23/23] ignite git commit: Merge branch 'ignite-1282' into ignite-1513-final

Posted by vo...@apache.org.
Merge branch 'ignite-1282' into ignite-1513-final


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/876e3744
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/876e3744
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/876e3744

Branch: refs/heads/ignite-1513-final
Commit: 876e37446c679b874230ddadb15bf932d311f474
Parents: 00879c1 220ecb3
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 18:00:10 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 18:00:10 2015 +0300

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |   5 +-
 .../apache/ignite/cache/CacheAtomicityMode.java |  17 +-
 .../configuration/CacheConfiguration.java       |  27 +-
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheMapEntry.java     |  51 +-
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../cache/GridCacheSwapEntryImpl.java           |  31 +-
 .../processors/cache/GridCacheSwapManager.java  |  80 ++-
 .../datastreamer/DataStreamerImpl.java          |   2 -
 .../org/apache/ignite/stream/StreamAdapter.java | 104 +++-
 .../stream/StreamMultipleTupleExtractor.java    |  38 ++
 .../stream/StreamSingleTupleExtractor.java      |  40 ++
 .../ignite/stream/StreamTupleExtractor.java     |  20 +-
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../IgniteCacheEntryListenerAbstractTest.java   |  65 +-
 .../IgniteCachePutRetryAbstractSelfTest.java    |  33 +
 ...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 +++-
 .../tcp/TcpClientDiscoverySpiSelfTest.java      |  13 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 112 +++-
 .../processors/query/h2/IgniteH2Indexing.java   |  19 +-
 .../query/h2/opt/GridH2AbstractKeyValueRow.java |  54 +-
 .../query/h2/opt/GridH2KeyValueRowOffheap.java  |  11 +-
 .../query/h2/opt/GridH2RowDescriptor.java       |   5 +
 .../processors/query/h2/opt/GridH2Table.java    |  10 +-
 .../cache/CacheIndexStreamerTest.java           |  37 +-
 .../processors/cache/GridCacheSwapSelfTest.java |   4 +-
 .../IgniteCacheWithIndexingTestSuite.java       |   2 +
 modules/mqtt/pom.xml                            | 114 ++++
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 553 +++++++++++++++++
 .../mqtt/IgniteMqttStreamerTestSuite.java       |  34 ++
 pom.xml                                         |   1 +
 32 files changed, 2040 insertions(+), 192 deletions(-)
----------------------------------------------------------------------



[22/23] ignite git commit: IGNITE-1282: Fix after merge from master.

Posted by vo...@apache.org.
IGNITE-1282: Fix after merge from master.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/220ecb30
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/220ecb30
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/220ecb30

Branch: refs/heads/ignite-1513-final
Commit: 220ecb3064e6714988d95950ec1670f6850811a1
Parents: 281b9f2
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:58:56 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:58:56 2015 +0300

----------------------------------------------------------------------
 .../configuration/CacheConfiguration.java       | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/220ecb30/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 44f7e1b..98f9fe1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -17,17 +17,17 @@
 
 package org.apache.ignite.configuration;
 
-import java.io.Serializable;
-import java.util.Collection;
-import javax.cache.Cache;
-import javax.cache.configuration.CompleteConfiguration;
-import javax.cache.configuration.Factory;
-import javax.cache.configuration.MutableConfiguration;
-import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheTypeMetadata;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.AffinityFunction;
 import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cache.eviction.EvictionFilter;
@@ -44,13 +44,13 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
 import org.jetbrains.annotations.Nullable;
 
 import javax.cache.Cache;
-import javax.cache.CacheException;
 import javax.cache.configuration.CompleteConfiguration;
 import javax.cache.configuration.Factory;
 import javax.cache.configuration.MutableConfiguration;
 import javax.cache.expiry.ExpiryPolicy;
 import java.io.Serializable;
 import java.util.Collection;
+import java.util.HashSet;
 
 /**
  * This class defines grid cache configuration. This configuration is passed to


[05/23] ignite git commit: IGNITE-535 (WIP) MQTT Streamer.

Posted by vo...@apache.org.
IGNITE-535 (WIP) MQTT Streamer.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53683e20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53683e20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53683e20

Branch: refs/heads/ignite-1513-final
Commit: 53683e20d304dfd96d544f286e8460d3829598d8
Parents: 6b53f1b
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:23:28 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:23:28 2015 +0100

----------------------------------------------------------------------
 modules/mqtt/pom.xml                            |  39 +-
 .../apache/ignite/stream/mqtt/MqttStreamer.java | 336 +++++++++++---
 .../stream/mqtt/IgniteMqttStreamerTest.java     | 435 +++++++++++++++++++
 .../ignite/stream/mqtt/TestTupleExtractors.java |  28 --
 4 files changed, 741 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index b108180..4b0b46c 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,8 @@
 
     <properties>
         <paho.version>1.0.2</paho.version>
-        <mosquette.version>0.7</mosquette.version>
+        <activemq.version>5.11.1</activemq.version>
+        <guava-retryier.version>2.0.0</guava-retryier.version>
     </properties>
 
     <dependencies>
@@ -54,9 +55,29 @@
         </dependency>
 
         <dependency>
-            <groupId>org.eclipse.moquette</groupId>
-            <artifactId>moquette-broker</artifactId>
-            <version>${mosquette.version}</version>
+            <groupId>com.github.rholder</groupId>
+            <artifactId>guava-retrying</artifactId>
+            <version>${guava-retryier.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-broker</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-kahadb-store</artifactId>
+            <version>${activemq.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.activemq</groupId>
+            <artifactId>activemq-mqtt</artifactId>
+            <version>${activemq.version}</version>
             <scope>test</scope>
         </dependency>
 
@@ -86,16 +107,6 @@
     <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
     <repositories>
         <repository>
-            <id>bintray</id>
-            <url>http://dl.bintray.com/andsel/maven/</url>
-            <releases>
-                <enabled>true</enabled>
-            </releases>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-        <repository>
             <id>Eclipse Paho Repo</id>
             <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
             <releases>

http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index 00a89ab..b86d385 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -20,6 +20,10 @@ package org.apache.ignite.stream.mqtt;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
@@ -29,12 +33,19 @@ import org.apache.ignite.stream.StreamAdapter;
 import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamSingleTupleExtractor;
 
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.StopStrategy;
+import com.github.rholder.retry.WaitStrategies;
+import com.github.rholder.retry.WaitStrategy;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 /**
@@ -90,8 +101,20 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
     private Integer disconnectForciblyTimeout;
 
+    private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+
+    private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+
+    private MqttConnectionRetrier connectionRetrier;
+
     private volatile boolean stopped = true;
 
+    private volatile boolean connected;
+
+    private String cachedLogPrefix;
+
+    private boolean blockUntilConnected;
+
     /**
      * Starts streamer.
      *
@@ -109,18 +132,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             // parameter validations
             A.notNull(getStreamer(), "streamer");
             A.notNull(getIgnite(), "ignite");
-            A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+            A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing");
             A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
                 "both single and multiple tuple extractor");
             A.notNullOrEmpty(brokerUrl, "broker URL");
             A.notNullOrEmpty(clientId, "client ID");
 
-            // if we have both a single topic and a list of topics, fail
-            if (topic != null && topic.length() > 0 && !topics.isEmpty())
+            // if we have both a single topic and a list of topics (but the list of topic is not of
+            // size 1 and == topic, as this would be a case of re-initialization), fail
+            if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
+                topics.size() != 1 && !topics.get(0).equals(topic))
                 throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
 
-            // if we have both a single QoS and list, fail
-            if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+            // same as above but for QoS
+            if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
+                !qualitiesOfService.get(0).equals(qualityOfService)) {
                 throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
             }
 
@@ -131,12 +157,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             }
 
             // if we have multiple topics
-            if (topics != null && !topics.isEmpty()) {
-                for (String t : topics) {
+            if (!topics.isEmpty()) {
+                for (String t : topics)
                     A.notNullOrEmpty(t, "topic in list of topics");
-                }
+
                 A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
                     "service must be either empty or have the same size as topics list");
+
+                cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]";
+            }
+            else {  // just the single topic
+                topics.add(topic);
+
+                if (qualityOfService != null)
+                    qualitiesOfService.add(qualityOfService);
+
+                cachedLogPrefix = "[" + topic + "]";
             }
 
             // create logger
@@ -148,10 +184,28 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
             else
                 client = new MqttClient(brokerUrl, clientId, persistence);
 
-            connectAndSubscribe();
+            // set this as a callback
+            client.setCallback(this);
 
+            // set stopped to false, as the connection will start async
             stopped = false;
 
+            // build retrier
+            Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
+                .retryIfResult(new Predicate<Boolean>() {
+                    @Override public boolean apply(Boolean connected) {
+                        return !connected;
+                    }
+                })
+                .retryIfException().retryIfRuntimeException()
+                .withWaitStrategy(retryWaitStrategy)
+                .withStopStrategy(retryStopStrategy)
+                .build();
+
+            // create the connection retrier
+            connectionRetrier = new MqttConnectionRetrier(retrier);
+            connectionRetrier.connect();
+
         }
         catch (Throwable t) {
             throw new IgniteException("Exception while initializing MqttStreamer", t);
@@ -159,34 +213,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
 
     }
 
-    private void connectAndSubscribe() throws MqttException {
-        // connect
-        if (connectOptions != null)
-            client.connect();
-        else
-            client.connect(connectOptions);
-
-        // subscribe to multiple topics
-        if (!topics.isEmpty()) {
-            if (qualitiesOfService.isEmpty()) {
-                client.subscribe(topics.toArray(new String[0]));
-            } else {
-                int[] qoses = new int[qualitiesOfService.size()];
-                for (int i = 0; i < qualitiesOfService.size(); i++)
-                    qoses[i] = qualitiesOfService.get(i);
-
-                client.subscribe(topics.toArray(new String[0]), qoses);
-            }
-        } else {
-            // subscribe to a single topic
-            if (qualityOfService == null) {
-                client.subscribe(topic);
-            } else {
-                client.subscribe(topic, qualityOfService);
-            }
-        }
-    }
-
     /**
      * Stops streamer.
      */
@@ -194,50 +220,250 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
         if (stopped)
             throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
 
+        // stop the retrier
+        connectionRetrier.stop();
+
         try {
             if (disconnectForcibly) {
-                if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+                if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null)
                     client.disconnectForcibly();
-                } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+
+                else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null)
                     client.disconnectForcibly(disconnectForciblyTimeout);
-                } else {
+
+                else
                     client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
-                }
+
             } else {
-                if (disconnectQuiesceTimeout == null) {
+                if (disconnectQuiesceTimeout == null)
                     client.disconnect();
-                } else {
+
+                else
                     client.disconnect(disconnectQuiesceTimeout);
-                }
+
             }
+
+            client.close();
+            connected = false;
+            stopped = true;
+
         }
         catch (Throwable t) {
             throw new IgniteException("Exception while stopping MqttStreamer", t);
         }
     }
 
+    // -------------------------------
+    //  MQTT Client callback methods
+    // -------------------------------
+
     @Override public void connectionLost(Throwable throwable) {
-        log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
-        // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
-        try {
-            connectAndSubscribe();
-        }
-        catch (MqttException e) {
-            e.printStackTrace();
-        }
+        connected = false;
+
+        // if we have been stopped, we do not try to establish the connection again
+        if (stopped)
+            return;
+
+        log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable);
+        connectionRetrier.connect();
     }
 
     @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
         if (getMultipleTupleExtractor() != null) {
             Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+            if (log.isTraceEnabled()) {
+                log.trace("Adding cache entries: " + entries);
+            }
             getStreamer().addData(entries);
-        } else {
+        }
+        else {
             Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+            if (log.isTraceEnabled()) {
+                log.trace("Adding cache entry: " + entry);
+            }
             getStreamer().addData(entry);
         }
     }
 
     @Override public void deliveryComplete(IMqttDeliveryToken token) {
-        // ignore, we don't send messages
+        // ignore, as we don't send messages
+    }
+
+    // -------------------------------
+    //  Getters and setters
+    // -------------------------------
+
+    public String getBrokerUrl() {
+        return brokerUrl;
+    }
+
+    public void setBrokerUrl(String brokerUrl) {
+        this.brokerUrl = brokerUrl;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public Integer getQualityOfService() {
+        return qualityOfService;
+    }
+
+    public void setQualityOfService(Integer qualityOfService) {
+        this.qualityOfService = qualityOfService;
+    }
+
+    public List<String> getTopics() {
+        return topics;
+    }
+
+    public void setTopics(List<String> topics) {
+        this.topics = topics;
+    }
+
+    public List<Integer> getQualitiesOfService() {
+        return qualitiesOfService;
+    }
+
+    public void setQualitiesOfService(List<Integer> qualitiesOfService) {
+        this.qualitiesOfService = qualitiesOfService;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public MqttClientPersistence getPersistence() {
+        return persistence;
+    }
+
+    public void setPersistence(MqttClientPersistence persistence) {
+        this.persistence = persistence;
+    }
+
+    public MqttConnectOptions getConnectOptions() {
+        return connectOptions;
+    }
+
+    public void setConnectOptions(MqttConnectOptions connectOptions) {
+        this.connectOptions = connectOptions;
+    }
+
+    public boolean isDisconnectForcibly() {
+        return disconnectForcibly;
+    }
+
+    public void setDisconnectForcibly(boolean disconnectForcibly) {
+        this.disconnectForcibly = disconnectForcibly;
+    }
+
+    public Integer getDisconnectQuiesceTimeout() {
+        return disconnectQuiesceTimeout;
+    }
+
+    public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
+        this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
+    }
+
+    public Integer getDisconnectForciblyTimeout() {
+        return disconnectForciblyTimeout;
+    }
+
+    public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
+        this.disconnectForciblyTimeout = disconnectForciblyTimeout;
     }
+
+    public WaitStrategy getRetryWaitStrategy() {
+        return retryWaitStrategy;
+    }
+
+    public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
+        this.retryWaitStrategy = retryWaitStrategy;
+    }
+
+    public StopStrategy getRetryStopStrategy() {
+        return retryStopStrategy;
+    }
+
+    public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
+        this.retryStopStrategy = retryStopStrategy;
+    }
+
+    public boolean isBlockUntilConnected() {
+        return blockUntilConnected;
+    }
+
+    public void setBlockUntilConnected(boolean blockUntilConnected) {
+        this.blockUntilConnected = blockUntilConnected;
+    }
+
+    private class MqttConnectionRetrier {
+
+        private final Retryer<Boolean> retrier;
+        private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+        public MqttConnectionRetrier(Retryer<Boolean> retrier) {
+            this.retrier = retrier;
+        }
+
+        public void connect() {
+            Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
+                @Override public Boolean call() throws Exception {
+                    // if we're already connected, return immediately
+                    if (connected)
+                        return true;
+
+                    if (stopped)
+                        return false;
+
+                    // connect to broker
+                    if (connectOptions == null)
+                        client.connect();
+                    else
+                        client.connect(connectOptions);
+
+                    // always use the multiple topics variant of the mqtt client; even if the user specified a single
+                    // topic and/or QoS, the initialization code would have placed it inside the 1..n structures
+                    if (qualitiesOfService.isEmpty())
+                        client.subscribe(topics.toArray(new String[0]));
+
+                    else {
+                        int[] qoses = new int[qualitiesOfService.size()];
+                        for (int i = 0; i < qualitiesOfService.size(); i++)
+                            qoses[i] = qualitiesOfService.get(i);
+
+                        client.subscribe(topics.toArray(new String[0]), qoses);
+                    }
+
+                    connected = true;
+                    return connected;
+                }
+            });
+
+            Future<Boolean> result = executor.submit(callable);
+
+            if (blockUntilConnected) {
+                try {
+                    result.get();
+                }
+                catch (Throwable e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        public void stop() {
+            executor.shutdownNow();
+        }
+
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 59730fa..012486a 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -17,11 +17,47 @@
 
 package org.apache.ignite.stream.mqtt;
 
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Splitter;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.After;
 import org.junit.Before;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
 /**
  * Test for {@link MqttStreamer}.
  *
@@ -29,6 +65,24 @@ import org.junit.Before;
  */
 public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
 
+    private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+    private static final String SINGLE_TOPIC_NAME = "abc";
+    private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+
+    private BrokerService broker;
+    private MqttClient client;
+    private String brokerUrl;
+    private int port;
+    private MqttStreamer<Integer, String> streamer;
+    private UUID remoteListener;
+
+    static {
+        for (int i = 0; i < 100; i++)
+            TEST_DATA.put(i, "v" + i);
+    }
+
+    private IgniteDataStreamer<Integer, String> dataStreamer;
+
     /** Constructor. */
     public IgniteMqttStreamerTest() {
         super(true);
@@ -38,13 +92,394 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
     public void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
 
+        // find an available local port
+        try (ServerSocket ss = new ServerSocket(0)) {
+            port = ss.getLocalPort();
+        }
+
+        // create the broker
+        broker = new BrokerService();
+        broker.deleteAllMessages();
+        broker.setPersistent(false);
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policy = new PolicyEntry();
+        policy.setQueuePrefetch(1);
+        broker.setDestinationPolicy(policyMap);
+        broker.getDestinationPolicy().setDefaultEntry(policy);
+
+        // add the MQTT transport connector to the broker
+        broker.addConnector("mqtt://localhost:" + port);
+        broker.setStartAsync(false);
+        broker.start(true);
+
+        // create the broker URL
+        brokerUrl = "tcp://localhost:" + port;
+
+        // create the client and connect
+        client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+        client.connect();
+
+        // create mqtt streamer
+        dataStreamer = grid().dataStreamer(null);
+        streamer = createMqttStreamer(dataStreamer);
     }
 
     @After
     public void afterTest() throws Exception {
+        try {
+            streamer.stop();
+        }
+        catch (Exception e) {
+            // ignore if already stopped
+        }
+
+        dataStreamer.close();
+
         grid().cache(null).clear();
 
+        broker.stop();
+        broker.deleteAllMessages();
+
+    }
+
+    public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+    }
+
+    public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
+        // configure streamer
+        streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, true);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+    }
+
+    public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
+        // configure streamer
+        streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, true);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+    }
+
+    public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        MqttConnectOptions connOptions = new MqttConnectOptions();
+        connOptions.setCleanSession(false);
+        streamer.setConnectOptions(connOptions);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        // explicitly stop the streamer
+        streamer.stop();
+
+        // send messages while stopped
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+        latch = subscribeToPutEvents(50);
+
+        // start the streamer again
+        streamer.start();
+
+        // assertions - make sure that messages sent during disconnection were also received
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(100);
+    }
+
+    public void testSingleTopic_NoQoS_Reconnect() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+        streamer.setRetryStopStrategy(StopStrategies.neverStop());
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        // now shutdown the broker, wait 2 seconds and start it again
+        broker.stop();
+        broker.start(true);
+        broker.waitUntilStarted();
+        Thread.sleep(2000);
+        client.connect();
+
+        // let's ensure we have 2 connections: Ignite and our test
+        assertEquals(2, broker.getTransportConnectorByScheme("mqtt").getConnections().size());
+
+        // subscribe to cache PUT events again
+        latch = subscribeToPutEvents(50);
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(100);
+    }
+
+    public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+        streamer.setRetryStopStrategy(StopStrategies.stopAfterAttempt(1));
+        streamer.setTopic(SINGLE_TOPIC_NAME);
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        // now shutdown the broker, wait 2 seconds and start it again
+        broker.stop();
+        broker.start(true);
+        broker.waitUntilStarted();
+        client.connect();
+
+        // lets send messages and ensure they are not received, because our retrier desisted
+        sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+        Thread.sleep(3000);
+        assertNull(grid().cache(null).get(50));
+
+    }
+
+    public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+        streamer.setQualitiesOfService(Arrays.asList(1, 1, 1, 1));
+
+        // subscribe to cache PUT events
+        CountDownLatch latch = subscribeToPutEvents(50);
+
+        // action time
+        streamer.start();
+
+        // send messages
+        sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+        // assertions
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+        assertCacheEntriesLoaded(50);
+
+        assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+        assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+    }
+
+    public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
+        // configure streamer
+        streamer.setSingleTupleExtractor(singleTupleExtractor());
+        streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+        streamer.setQualitiesOfService(Arrays.asList(1, 1, 1));
+
+        try {
+            streamer.start();
+        }
+        catch (Exception e) {
+            return;
+        }
+        fail("Expected an exception reporting invalid parameters");
+
+    }
+
+    private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
+        MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
+        streamer.setIgnite(grid());
+        streamer.setStreamer(dataStreamer);
+        streamer.setBrokerUrl(brokerUrl);
+        streamer.setClientId(UUID.randomUUID().toString());
+        streamer.setBlockUntilConnected(true);
+
+        dataStreamer.allowOverwrite(true);
+        dataStreamer.autoFlushFrequency(1);
+
+        return streamer;
+    }
+
+    public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+        if (singleMessage) {
+            final List<StringBuilder> sbs = new ArrayList<>(topics.size());
+            // initialize String Builders for each topic
+            F.forEach(topics, new IgniteInClosure<String>() {
+                @Override public void apply(String s) {
+                    sbs.add(new StringBuilder());
+                }
+            });
+            // fill String Builders for each topic
+            F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() {
+                @Override public void apply(Integer integer) {
+                    sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
+                }
+            });
+            // send each buffer out
+            for (int i = 0; i < topics.size(); i++) {
+                MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes());
+                client.publish(topics.get(i % topics.size()), msg);
+            }
+        }
+        else {
+            for (int i = fromIdx; i < fromIdx + count; i++) {
+                byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
+                MqttMessage msg = new MqttMessage(payload);
+                client.publish(topics.get(i % topics.size()), msg);
+            }
+        }
+    }
+
+    private CountDownLatch subscribeToPutEvents(int expect) {
+        Ignite ignite = grid();
+
+        // Listen to cache PUT events and expect as many as messages as test data items
+        final CountDownLatch latch = new CountDownLatch(expect);
+        @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+            @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                latch.countDown();
+                return true;
+            }
+        };
+
+        remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+        return latch;
+    }
+
+    private void assertCacheEntriesLoaded(int count) {
+        // get the cache and check that the entries are present
+        IgniteCache<Integer, String> cache = grid().cache(null);
+
+        // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
+        for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+            assertEquals(TEST_DATA.get(key), cache.get(key));
+        }
+
+        // assert that the cache exactly the specified amount of elements
+        assertEquals(count, cache.size(CachePeekMode.ALL));
+
+        // remove the event listener
+        grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+    }
+
+    public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
+        return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
+            @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
+                List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
+                return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+            }
+        };
+    }
 
+    public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
+        return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
+            @Override public Map<Integer, String> extract(MqttMessage msg) {
+                final Map<String, String> map = Splitter.on("\n")
+                    .omitEmptyStrings()
+                    .withKeyValueSeparator(",")
+                    .split(new String(msg.getPayload()));
+                final Map<Integer, String> answer = new HashMap<>();
+                F.forEach(map.keySet(), new IgniteInClosure<String>() {
+                    @Override public void apply(String s) {
+                        answer.put(Integer.parseInt(s), map.get(s));
+                    }
+                });
+                return answer;
+            }
+        };
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
deleted file mode 100644
index e2ed0f0..0000000
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.mqtt;
-
-/**
- * Test transformers for MqttStreamer tests.
- *
- * @author Raul Kripalani
- */
-public class TestTupleExtractors {
-
-
-}
\ No newline at end of file


[15/23] ignite git commit: ignite-973 - fix

Posted by vo...@apache.org.
ignite-973 - fix


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0be45e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0be45e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0be45e3

Branch: refs/heads/ignite-1513-final
Commit: f0be45e309f9a594334209a251c069f9ba3db120
Parents: e51fb42
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:36:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:36:40 2015 +0300

----------------------------------------------------------------------
 .../query/h2/opt/GridH2AbstractKeyValueRow.java           |  3 +++
 .../internal/processors/query/h2/opt/GridH2Table.java     | 10 +++++++++-
 .../internal/processors/cache/CacheIndexStreamerTest.java |  4 ++--
 3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 07c49a5..4a16284 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,6 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
      * @throws IgniteCheckedException If failed.
      */
     public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
+        if (peekValue(VAL_COL) != null)
+            return;
+
         setValue(VAL_COL, desc.wrap(val, desc.valueType()));
 
         notifyAll();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 66241b4..bf318b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -55,6 +55,8 @@ import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+
 /**
  * H2 Table implementation.
  */
@@ -372,6 +374,12 @@ public class GridH2Table extends TableBase {
             if (!del) {
                 GridH2Row old = pk.put(row); // Put to PK.
 
+                if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value on replace.
+                    GridH2AbstractKeyValueRow kvOld = (GridH2AbstractKeyValueRow)old;
+
+                    kvOld.onUnswap(kvOld.getValue(VAL_COL), true);
+                }
+
                 int len = idxs.size();
 
                 int i = 1;
@@ -399,7 +407,7 @@ public class GridH2Table extends TableBase {
                 GridH2Row old = pk.remove(row);
 
                 if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value.
-                    Value v = row.getValue(GridH2AbstractKeyValueRow.VAL_COL);
+                    Value v = row.getValue(VAL_COL);
 
                     if (v != null)
                         ((GridH2AbstractKeyValueRow)old).onUnswap(v.getObject(), true);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 25c3b81..23f4e91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
@@ -52,7 +52,7 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
 
         ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
 
-        cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+        cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
 
         return cfg;
     }