You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/09/22 16:58:44 UTC
[01/22] ignite git commit: IGNITE-1370 Streamers: Implement multiple
tuple extractor.
Repository: ignite
Updated Branches:
refs/heads/ignite-1282 b711a5a5f -> 220ecb306
IGNITE-1370 Streamers: Implement multiple tuple extractor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d9734a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d9734a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d9734a0
Branch: refs/heads/ignite-1282
Commit: 4d9734a0842f0a46310c1f6261fdf42371db8705
Parents: b736c46
Author: Raul Kripalani <ra...@apache.org>
Authored: Thu Sep 3 23:31:08 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Sun Sep 13 01:20:24 2015 +0100
----------------------------------------------------------------------
.../org/apache/ignite/stream/StreamAdapter.java | 48 ++++++++-
.../stream/StreamMultipleTupleExtractor.java | 38 +++++++
.../ignite/stream/StreamTupleExtractor.java | 5 +
.../ignite/stream/socket/SocketStreamer.java | 3 +-
.../stream/socket/SocketStreamerSelfTest.java | 104 +++++++++++++++----
5 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 97edcbb..ffa0821 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.stream;
import java.util.Map;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
@@ -26,11 +27,22 @@ import org.apache.ignite.IgniteDataStreamer;
* streaming from different data sources. The purpose of adapters is to
* convert different message formats into Ignite stream key-value tuples
* and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ * <p>
+ * Two types of tuple extractors are supported:
+ * <ol>
+ * <li>A single tuple extractor, which extracts either no or 1 tuple out of a message. See
+ * see {@link #setTupleExtractor(StreamTupleExtractor)}.</li>
+ * <li>A multiple tuple extractor, which is capable of extracting multiple tuples out of a single message, in the
+ * form of a {@link Map<K, V>}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.</li>
+ * </ol>
*/
public abstract class StreamAdapter<T, K, V> {
/** Tuple extractor. */
private StreamTupleExtractor<T, K, V> extractor;
+ /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
+ private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
+
/** Streamer. */
private IgniteDataStreamer<K, V> stmr;
@@ -84,6 +96,20 @@ public abstract class StreamAdapter<T, K, V> {
}
/**
+ * @return Provided tuple extractor (for 1:n cardinality).
+ */
+ public StreamMultipleTupleExtractor<T, K, V> getMultipleTupleExtractor() {
+ return multipleTupleExtractor;
+ }
+
+ /**
+ * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
+ */
+ public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+ this.multipleTupleExtractor = multipleTupleExtractor;
+ }
+
+ /**
* @return Provided {@link Ignite} instance.
*/
public Ignite getIgnite() {
@@ -98,14 +124,28 @@ public abstract class StreamAdapter<T, K, V> {
}
/**
- * Converts given message to a tuple and adds it to the underlying streamer.
+ * Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
+ * underlying streamer.
+ * <p>
+ * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
+ * and the latter will be ignored.
*
* @param msg Message to convert.
*/
protected void addMessage(T msg) {
- Map.Entry<K, V> e = extractor.extract(msg);
+ if (multipleTupleExtractor == null) {
+ Map.Entry<K, V> e = extractor.extract(msg);
+
+ if (e != null)
+ stmr.addData(e);
- if (e != null)
- stmr.addData(e);
+ } else {
+ Map<K, V> m = multipleTupleExtractor.extract(msg);
+
+ if (m != null)
+ stmr.addData(m);
+
+ }
}
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
new file mode 100644
index 0000000..71ad45a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a single message to zero, one or many tuples.
+ * <p>
+ * For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}.
+ *
+ * @see StreamTupleExtractor
+ */
+public interface StreamMultipleTupleExtractor<T, K, V> {
+
+ /**
+ * Extracts a set of key-values from a message.
+ *
+ * @param msg Message.
+ * @return Map containing resulting tuples.
+ */
+ public Map<K, V> extract(T msg);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index b6150ab..aed7d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -21,6 +21,11 @@ import java.util.Map;
/**
* Stream tuple extractor to convert messages to Ignite key-value tuples.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
+ * produce more than one tuple.
+ *
+ * @see StreamMultipleTupleExtractor
*/
public interface StreamTupleExtractor<T, K, V> {
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 0d27af9..c89952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* @throws IgniteException If failed.
*/
public void start() {
- A.notNull(getTupleExtractor(), "tupleExtractor");
+ A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+ "tupleExtractor (single or multiple)");
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
A.ensure(threads > 0, "threads > 0");
http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 185599d..8b05754 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
@@ -43,6 +44,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -111,7 +113,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
Marshaller marsh = new JdkMarshaller();
for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
+ byte[] msg = marsh.marshal(new Message(i));
os.write(msg.length >>> 24);
os.write(msg.length >>> 16);
@@ -125,21 +127,52 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMultipleEntriesFromOneMessage() throws Exception {
+ test(null, null, new Runnable() {
+ @Override public void run() {
+ try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ int[] values = new int[CNT];
+ for (int i = 0; i < CNT; i++) {
+ values[i] = i;
+ }
+
+ byte[] msg = marsh.marshal(new Message(values));
+
+ os.write(msg.length >>> 24);
+ os.write(msg.length >>> 16);
+ os.write(msg.length >>> 8);
+ os.write(msg.length);
+
+ os.write(msg);
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }, false);
}
/**
* @throws Exception If failed.
*/
public void testSizeBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
+ SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+ @Override public Message convert(byte[] msg) {
int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8;
i |= msg[3] & 0xFF;
- return new Tuple(i);
+ return new Message(i);
}
};
@@ -164,7 +197,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
}
/**
@@ -178,7 +211,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
Marshaller marsh = new JdkMarshaller();
for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
+ byte[] msg = marsh.marshal(new Message(i));
os.write(msg);
os.write(DELIM);
@@ -188,7 +221,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
}
@@ -196,14 +229,14 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testDelimiterBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
+ SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>() {
+ @Override public Message convert(byte[] msg) {
int i = (msg[0] & 0xFF) << 24;
i |= (msg[1] & 0xFF) << 16;
i |= (msg[2] & 0xFF) << 8;
i |= msg[3] & 0xFF;
- return new Tuple(i);
+ return new Message(i);
}
};
@@ -225,16 +258,17 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
throw new IgniteException(e);
}
}
- });
+ }, true);
}
/**
* @param converter Converter.
* @param r Runnable..
*/
- private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+ private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
+ boolean oneMessagePerTuple) throws Exception
{
- SocketStreamer<Tuple, Integer, String> sockStmr = null;
+ SocketStreamer<Message, Integer, String> sockStmr = null;
Ignite ignite = grid(0);
@@ -257,11 +291,24 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
sockStmr.setDelimiter(delim);
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
- @Override public Map.Entry<Integer, String> extract(Tuple msg) {
- return new IgniteBiTuple<>(msg.key, msg.val);
- }
- });
+ if (oneMessagePerTuple) {
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(Message msg) {
+ return new IgniteBiTuple<>(msg.key, msg.val);
+ }
+ });
+ }
+ else {
+ sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor<Message, Integer, String>() {
+ @Override public Map<Integer, String> extract(Message msg) {
+ Map<Integer, String> answer = new HashMap<>();
+ for (int value : msg.values) {
+ answer.put(value, Integer.toString(value));
+ }
+ return answer;
+ }
+ });
+ }
if (converter != null)
sockStmr.setConverter(converter);
@@ -297,9 +344,9 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
}
/**
- * Tuple.
+ * Message.
*/
- private static class Tuple implements Serializable {
+ private static class Message implements Serializable {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -309,12 +356,25 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
/** Value. */
private final String val;
+ /** Multiple values. */
+ private final int[] values;
+
/**
* @param key Key.
*/
- Tuple(int key) {
+ Message(int key) {
this.key = key;
this.val = Integer.toString(key);
+ this.values = new int[0];
+ }
+
+ /**
+ * @param values Multiple values.
+ */
+ Message(int[] values) {
+ this.key = -1;
+ this.val = null;
+ this.values = values;
}
}
}
\ No newline at end of file
[10/22] ignite git commit: ignite-973 Fixed atomic cache 'remove' to
always provide old value for indexing
Posted by vo...@apache.org.
ignite-973 Fixed atomic cache 'remove' to always provide old value for indexing
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/621eb0f7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/621eb0f7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/621eb0f7
Branch: refs/heads/ignite-1282
Commit: 621eb0f75bbe1a0a623229dded38a3549309eead
Parents: 8b94494
Author: sboikov <se...@inria.fr>
Authored: Mon Sep 21 21:37:52 2015 +0300
Committer: sboikov <se...@inria.fr>
Committed: Mon Sep 21 21:37:52 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheMapEntry.java | 37 +++++++++++++-------
.../processors/cache/GridCacheProcessor.java | 2 +-
.../processors/cache/GridCacheSwapManager.java | 24 ++++++-------
.../datastreamer/DataStreamerImpl.java | 2 --
4 files changed, 37 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index f2bb646..961c792 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1588,6 +1588,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
boolean hasValPtr = hasOffHeapPointer();
+ if (old == null)
+ old = saveValueForIndexUnlocked();
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(old);
@@ -2163,6 +2166,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
// Must persist inside synchronization in non-tx mode.
cctx.store().remove(null, keyValue(false));
+ if (oldVal == null)
+ oldVal = saveValueForIndexUnlocked();
+
// Update index inside synchronization since it can be updated
// in load methods without actually holding entry lock.
clearIndex(oldVal);
@@ -3342,7 +3348,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
synchronized (this) {
- CacheObject expiredVal = saveValueForIndexUnlocked();
+ CacheObject expiredVal = saveOldValueUnlocked(false);
boolean hasOldBytes = hasOffHeapPointer();
@@ -3523,12 +3529,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null && qryMgr.enabled()) {
- qryMgr.store(key,
- val,
- ver,
- expireTime);
- }
+ if (qryMgr.enabled())
+ qryMgr.store(key, val, ver, expireTime);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3547,8 +3549,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
try {
GridCacheQueryManager<?, ?> qryMgr = cctx.queries();
- if (qryMgr != null)
- qryMgr.remove(key(), prevVal == null ? null : prevVal);
+ if (qryMgr.enabled())
+ qryMgr.remove(key(), prevVal);
}
catch (IgniteCheckedException e) {
throw new GridCacheIndexUpdateException(e);
@@ -3562,10 +3564,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
* @return Previous value or {@code null}.
* @throws IgniteCheckedException If failed to retrieve previous value.
*/
- protected CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+ protected final CacheObject saveValueForIndexUnlocked() throws IgniteCheckedException {
+ return saveOldValueUnlocked(true);
+ }
+
+ /**
+ * @param qryOnly If {@code true} reads old value only if query indexing is enabled.
+ * @return Previous value or {@code null}.
+ * @throws IgniteCheckedException If failed to retrieve previous value.
+ */
+ private CacheObject saveOldValueUnlocked(boolean qryOnly) throws IgniteCheckedException {
assert Thread.holdsLock(this);
- if (cctx.queries() == null)
+ if (qryOnly && !cctx.queries().enabled())
return null;
CacheObject val = rawGetOrUnmarshalUnlocked(false);
@@ -3681,7 +3692,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
if (obsoleteVersionExtras() != null)
return true;
- CacheObject prev = saveValueForIndexUnlocked();
+ CacheObject prev = saveOldValueUnlocked(false);
if (!hasReaders() && markObsolete0(obsoleteVer, false)) {
if (swap) {
@@ -3791,7 +3802,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onUnswap(key, prevVal);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c92de7d..7c16136 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2759,7 +2759,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
if (spaceName.equals(CU.swapSpaceName(cctx))) {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null) {
+ if (qryMgr.enabled()) {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index 9b6381e..d9a8b5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -696,12 +696,12 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
final GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+ if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
return null; // Not found.
swapMgr.remove(spaceName, swapKey, new CI1<byte[]>() {
@Override public void apply(byte[] rmv) {
- if (qryMgr == null && cctx.config().isStatisticsEnabled())
+ if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onSwapRead(rmv != null);
if (rmv != null) {
@@ -843,7 +843,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheSwapEntry entry;
- if (qryMgr != null) {
+ if (qryMgr.enabled()) {
entry = readOffheapBeforeRemove(key, keyBytes, part);
if (entry != null) {
@@ -952,7 +952,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
ClassLoader ldr = cctx.deploy().globalLoader();
- if (qryMgr != null) { // Unswap for indexing.
+ if (qryMgr.enabled()) { // Unswap for indexing.
Iterator<SwapKey> iter = unprocessedKeys.iterator();
while (iter.hasNext()) {
@@ -967,7 +967,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
unprocessedKeys,
new IgniteBiInClosure<SwapKey, byte[]>() {
@Override public void apply(SwapKey swapKey, byte[] rmv) {
- if (qryMgr == null && cctx.config().isStatisticsEnabled())
+ if (!qryMgr.enabled() && cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onSwapRead(rmv != null);
if (rmv != null) {
@@ -1124,7 +1124,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*/
public GridCacheSwapEntry readOffheapBeforeRemove(KeyCacheObject key, byte[] keyBytes, int part)
throws IgniteCheckedException {
- assert cctx.queries() != null;
+ assert cctx.queries().enabled();
byte[] entryBytes = offheap.get(spaceName, part, key, keyBytes);
@@ -1155,7 +1155,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
*/
private boolean readSwapBeforeRemove(@Nullable KeyCacheObject key, SwapKey swapKey, ClassLoader ldr)
throws IgniteCheckedException {
- assert cctx.queries() != null;
+ assert cctx.queries().enabled();
byte[] entryBytes = swapMgr.read(spaceName, swapKey, ldr);
@@ -1196,7 +1196,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (offheapEnabled) {
byte[] keyBytes = key.valueBytes(cctx.cacheObjectContext());
- if ((qryMgr == null || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
+ if ((!qryMgr.enabled() || readOffheapBeforeRemove(key, keyBytes, part) != null) &&
offheap.removex(spaceName, part, key, keyBytes)) {
if (cctx.config().isStatisticsEnabled())
cctx.cache().metrics0().onOffHeapRemove();
@@ -1212,7 +1212,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
ClassLoader ldr = cctx.deploy().globalLoader();
- if (qryMgr != null && !readSwapBeforeRemove(key, swapKey, ldr))
+ if (qryMgr.enabled() && !readSwapBeforeRemove(key, swapKey, ldr))
return; // Not found.
swapMgr.remove(spaceName,
@@ -1279,7 +1279,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
GridCacheQueryManager qryMgr = cctx.queries();
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(key);
}
@@ -1308,7 +1308,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.events().addEvent(swapEntry.partition(), swapEntry.key(), cctx.nodeId(),
(IgniteUuid)null, null, EVT_CACHE_OBJECT_TO_OFFHEAP, null, false, null, true, null, null, null);
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(swapEntry.key());
}
}
@@ -1330,7 +1330,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.events().addEvent(batchSwapEntry.partition(), batchSwapEntry.key(), cctx.nodeId(),
(IgniteUuid)null, null, EVT_CACHE_OBJECT_SWAPPED, null, false, null, true, null, null, null);
- if (qryMgr != null)
+ if (qryMgr.enabled())
qryMgr.onSwap(batchSwapEntry.key());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/621eb0f7/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index b5d9a7d..ab2a6e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -1569,8 +1569,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
GridCacheEntryEx entry = internalCache.entryEx(e.getKey(), topVer);
- entry.unswap(false);
-
if (plc != null) {
ttl = CU.toTtl(plc.getExpiryForCreation());
[04/22] ignite git commit: IGNITE-535 (WIP) Implement MQTT Streamer.
Posted by vo...@apache.org.
IGNITE-535 (WIP) Implement MQTT Streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6b53f1bb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6b53f1bb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6b53f1bb
Branch: refs/heads/ignite-1282
Commit: 6b53f1bb2699ae7a6ea8ae277de1bfc3ecbd7b6a
Parents: b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:45:58 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 22:30:09 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 110 +++++++++
.../apache/ignite/stream/mqtt/MqttStreamer.java | 243 +++++++++++++++++++
.../stream/mqtt/IgniteMqttStreamerTest.java | 50 ++++
.../mqtt/IgniteMqttStreamerTestSuite.java | 34 +++
.../ignite/stream/mqtt/TestTupleExtractors.java | 28 +++
pom.xml | 1 +
6 files changed, 466 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
new file mode 100644
index 0000000..b108180
--- /dev/null
+++ b/modules/mqtt/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+ POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent</relativePath>
+ </parent>
+
+ <artifactId>ignite-mqtt</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <url>http://ignite.apache.org</url>
+
+ <properties>
+ <paho.version>1.0.2</paho.version>
+ <mosquette.version>0.7</mosquette.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>${paho.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.eclipse.moquette</groupId>
+ <artifactId>moquette-broker</artifactId>
+ <version>${mosquette.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-log4j</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-spring</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
+ <repositories>
+ <repository>
+ <id>bintray</id>
+ <url>http://dl.bintray.com/andsel/maven/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ <repository>
+ <id>Eclipse Paho Repo</id>
+ <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+
+</project>
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
new file mode 100644
index 0000000..00a89ab
--- /dev/null
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.stream.StreamAdapter;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * Streamer that consumes from a MQTT topic and feeds key-value pairs into an {@link IgniteDataStreamer} instance,
+ * using Eclipse Paho as an MQTT client.
+ * <p>
+ * You must also provide a {@link StreamSingleTupleExtractor} or a {@link StreamMultipleTupleExtractor} to extract
+ * cache tuples out of the incoming message.
+ * <p>
+ * This Streamer has many features:
+ *
+ * <ul>
+ * <li>Subscribing to a single topic or multiple topics at once.</li>
+ * <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
+ * <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
+ * sessions, etc.</li>
+ * <li>Specifying the client ID.</li>
+ * </ul>
+ *
+ * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * {@link #connectOptions} property.
+ *
+ * @author Raul Kripalani
+ */
+public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ private MqttClient client;
+
+ private String brokerUrl;
+
+ private String topic;
+
+ private Integer qualityOfService;
+
+ private List<String> topics;
+
+ private List<Integer> qualitiesOfService;
+
+ /** Client ID in case we're using durable subscribers. */
+ private String clientId;
+
+ private MqttClientPersistence persistence;
+
+ private MqttConnectOptions connectOptions;
+
+ // disconnect parameters
+ private Integer disconnectQuiesceTimeout;
+
+ private boolean disconnectForcibly;
+
+ private Integer disconnectForciblyTimeout;
+
+ private volatile boolean stopped = true;
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() throws IgniteException {
+ if (!stopped)
+ throw new IgniteException("Attempted to start an already started MQTT Streamer");
+
+ // for simplicity, if these are null initialize to empty lists
+ topics = topics == null ? new ArrayList<String>() : topics;
+ qualitiesOfService = qualitiesOfService == null ? new ArrayList<Integer>() : qualitiesOfService;
+
+ try {
+ // parameter validations
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+ A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
+ "both single and multiple tuple extractor");
+ A.notNullOrEmpty(brokerUrl, "broker URL");
+ A.notNullOrEmpty(clientId, "client ID");
+
+ // if we have both a single topic and a list of topics, fail
+ if (topic != null && topic.length() > 0 && !topics.isEmpty())
+ throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
+
+ // if we have both a single QoS and list, fail
+ if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+ throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
+ }
+
+ // Paho API requires disconnect timeout if providing a quiesce timeout and disconnecting forcibly
+ if (disconnectForcibly && disconnectQuiesceTimeout != null) {
+ A.notNull(disconnectForciblyTimeout, "disconnect timeout cannot be null when disconnecting forcibly " +
+ "with quiesce");
+ }
+
+ // if we have multiple topics
+ if (topics != null && !topics.isEmpty()) {
+ for (String t : topics) {
+ A.notNullOrEmpty(t, "topic in list of topics");
+ }
+ A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
+ "service must be either empty or have the same size as topics list");
+ }
+
+ // create logger
+ log = getIgnite().log();
+
+ // create the mqtt client
+ if (persistence == null)
+ client = new MqttClient(brokerUrl, clientId);
+ else
+ client = new MqttClient(brokerUrl, clientId, persistence);
+
+ connectAndSubscribe();
+
+ stopped = false;
+
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Exception while initializing MqttStreamer", t);
+ }
+
+ }
+
+ private void connectAndSubscribe() throws MqttException {
+ // connect
+ if (connectOptions != null)
+ client.connect();
+ else
+ client.connect(connectOptions);
+
+ // subscribe to multiple topics
+ if (!topics.isEmpty()) {
+ if (qualitiesOfService.isEmpty()) {
+ client.subscribe(topics.toArray(new String[0]));
+ } else {
+ int[] qoses = new int[qualitiesOfService.size()];
+ for (int i = 0; i < qualitiesOfService.size(); i++)
+ qoses[i] = qualitiesOfService.get(i);
+
+ client.subscribe(topics.toArray(new String[0]), qoses);
+ }
+ } else {
+ // subscribe to a single topic
+ if (qualityOfService == null) {
+ client.subscribe(topic);
+ } else {
+ client.subscribe(topic, qualityOfService);
+ }
+ }
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() throws IgniteException {
+ if (stopped)
+ throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+
+ try {
+ if (disconnectForcibly) {
+ if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+ client.disconnectForcibly();
+ } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+ client.disconnectForcibly(disconnectForciblyTimeout);
+ } else {
+ client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
+ }
+ } else {
+ if (disconnectQuiesceTimeout == null) {
+ client.disconnect();
+ } else {
+ client.disconnect(disconnectQuiesceTimeout);
+ }
+ }
+ }
+ catch (Throwable t) {
+ throw new IgniteException("Exception while stopping MqttStreamer", t);
+ }
+ }
+
+ @Override public void connectionLost(Throwable throwable) {
+ log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
+ // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
+ try {
+ connectAndSubscribe();
+ }
+ catch (MqttException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override public void messageArrived(String topic, MqttMessage message) throws Exception {
+ if (getMultipleTupleExtractor() != null) {
+ Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+ getStreamer().addData(entries);
+ } else {
+ Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+ getStreamer().addData(entry);
+ }
+ }
+
+ @Override public void deliveryComplete(IMqttDeliveryToken token) {
+ // ignore, we don't send messages
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
new file mode 100644
index 0000000..59730fa
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Test for {@link MqttStreamer}.
+ *
+ * @author Raul Kripalani
+ */
+public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+
+ /** Constructor. */
+ public IgniteMqttStreamerTest() {
+ super(true);
+ }
+
+ @Before @SuppressWarnings("unchecked")
+ public void beforeTest() throws Exception {
+ grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ }
+
+ @After
+ public void afterTest() throws Exception {
+ grid().cache(null).clear();
+
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
new file mode 100644
index 0000000..ff25145
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTestSuite.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * MQTT streamer tests.
+ *
+ * @author Raul Kripalani
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+ IgniteMqttStreamerTest.class
+})
+public class IgniteMqttStreamerTestSuite {
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
new file mode 100644
index 0000000..e2ed0f0
--- /dev/null
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.mqtt;
+
+/**
+ * Test transformers for MqttStreamer tests.
+ *
+ * @author Raul Kripalani
+ */
+public class TestTupleExtractors {
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/6b53f1bb/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b47958f..c19a9b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
<module>modules/kafka</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
+ <module>modules/mqtt</module>
<module>modules/zookeeper</module>
<module>modules/platform</module>
</modules>
[08/22] ignite git commit: IGNITE-535 Finish MQTT Streamer docs and
tests. Upgrade latter to AMQ 5.12.0.
Posted by vo...@apache.org.
IGNITE-535 Finish MQTT Streamer docs and tests. Upgrade latter to AMQ 5.12.0.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/296dd6e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/296dd6e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/296dd6e7
Branch: refs/heads/ignite-1282
Commit: 296dd6e7d86fe6d0914a9fbf8062632c04e4d22c
Parents: f03f3a3
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:24:44 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:24:44 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 9 +-
.../apache/ignite/stream/mqtt/MqttStreamer.java | 156 ++++++++++++++++++-
.../stream/mqtt/IgniteMqttStreamerTest.java | 80 +++++++++-
3 files changed, 224 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index 4b0b46c..21511e8 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,7 @@
<properties>
<paho.version>1.0.2</paho.version>
- <activemq.version>5.11.1</activemq.version>
+ <activemq.version>5.12.0</activemq.version>
<guava-retryier.version>2.0.0</guava-retryier.version>
</properties>
@@ -69,13 +69,6 @@
<dependency>
<groupId>org.apache.activemq</groupId>
- <artifactId>activemq-kahadb-store</artifactId>
- <version>${activemq.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.activemq</groupId>
<artifactId>activemq-mqtt</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index b86d385..f18ae42 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -62,12 +62,17 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
* <li>Specifying the subscriber's QoS for a single topic or for multiple topics.</li>
* <li>Allows setting {@link MqttConnectOptions} to support features like last will testament, persistent
* sessions, etc.</li>
- * <li>Specifying the client ID.</li>
+ * <li>Specifying the client ID. A random one will be generated and maintained throughout reconnections if the user
+ * does not provide one.</li>
+ * <li>(Re-)Connection retries based on the <i>guava-retrying</i> library. Retry wait and retry stop policies
+ * can be configured.</li>
+ * <li>Blocking the start() method until connected for the first time.</li>
* </ul>
*
- * Note: features like durable subscriptions, last will testament, etc. must be configured via the
+ * Note: features like durable subscriptions, last will testament, etc. can be configured via the
* {@link #connectOptions} property.
*
+ * @see <a href="https://github.com/rholder/guava-retrying">guava-retrying library</a>
* @author Raul Kripalani
*/
public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> implements MqttCallback {
@@ -75,46 +80,65 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
/** Logger. */
private IgniteLogger log;
+ /** The MQTT client object for internal use. */
private MqttClient client;
+ /** The broker URL, set by the user. */
private String brokerUrl;
+ /** The topic to subscribe to, if a single topic. */
private String topic;
+ /** The quality of service to use for a single topic subscription (optional). */
private Integer qualityOfService;
+ /** The topics to subscribe to, if many. */
private List<String> topics;
+ /** The qualities of service to use for multiple topic subscriptions. If specified, it must contain the same
+ * number of elements as {@link #topics}. */
private List<Integer> qualitiesOfService;
- /** Client ID in case we're using durable subscribers. */
+ /** The MQTT client ID (optional). */
private String clientId;
+ /** A configurable persistence mechanism. If not set, Paho will use its default. */
private MqttClientPersistence persistence;
+ /** The MQTT client connect options, where users can configured the last will and testament, durability, etc. */
private MqttConnectOptions connectOptions;
- // disconnect parameters
+ /** Quiesce timeout on disconnection. */
private Integer disconnectQuiesceTimeout;
+ /** Whether to disconnect forcibly or not. */
private boolean disconnectForcibly;
+ /** If disconnecting forcibly, the timeout. */
private Integer disconnectForciblyTimeout;
+ /** The strategy to determine how long to wait between retry attempts. By default, this streamer uses a
+ * Fibonacci-based strategy. */
private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+ /** The strategy to determine when to stop retrying to (re-)connect. By default, we never stop. */
private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+ /** The internal connection retrier object with a thread pool of size 1. */
private MqttConnectionRetrier connectionRetrier;
+ /** Whether to block the start() method until connected for the first time. */
+ private boolean blockUntilConnected;
+
+ /** State keeping. */
private volatile boolean stopped = true;
+ /** State keeping. */
private volatile boolean connected;
+ /** Cached log prefix for cache messages. */
private String cachedLogPrefix;
- private boolean blockUntilConnected;
-
/**
* Starts streamer.
*
@@ -136,7 +160,11 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
"both single and multiple tuple extractor");
A.notNullOrEmpty(brokerUrl, "broker URL");
- A.notNullOrEmpty(clientId, "client ID");
+
+ // if the client ID is empty, generate one
+ if (clientId == null || clientId.length() == 0) {
+ clientId = MqttClient.generateClientId();
+ }
// if we have both a single topic and a list of topics (but the list of topic is not of
// size 1 and == topic, as this would be a case of re-initialization), fail
@@ -257,6 +285,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// MQTT Client callback methods
// -------------------------------
+ /**
+ * {@inheritDoc}
+ */
@Override public void connectionLost(Throwable throwable) {
connected = false;
@@ -268,6 +299,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
connectionRetrier.connect();
}
+ /**
+ * {@inheritDoc}
+ */
@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
if (getMultipleTupleExtractor() != null) {
Map<K, V> entries = getMultipleTupleExtractor().extract(message);
@@ -285,6 +319,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
}
+ /**
+ * {@inheritDoc}
+ */
@Override public void deliveryComplete(IMqttDeliveryToken token) {
// ignore, as we don't send messages
}
@@ -293,127 +330,229 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// Getters and setters
// -------------------------------
+ /**
+ * @return
+ */
public String getBrokerUrl() {
return brokerUrl;
}
+ /**
+ * @param brokerUrl The Broker URL (compulsory).
+ */
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
+ /**
+ * @return
+ */
public String getTopic() {
return topic;
}
+ /**
+ * @param topic The topic to subscribe to, if a single topic.
+ */
public void setTopic(String topic) {
this.topic = topic;
}
+ /**
+ * @return
+ */
public Integer getQualityOfService() {
return qualityOfService;
}
+ /**
+ * @param qualityOfService The quality of service to use for a single topic subscription (optional).
+ */
public void setQualityOfService(Integer qualityOfService) {
this.qualityOfService = qualityOfService;
}
+ /**
+ * @return
+ */
public List<String> getTopics() {
return topics;
}
+ /**
+ * @param topics The topics to subscribe to, if many.
+ */
public void setTopics(List<String> topics) {
this.topics = topics;
}
+ /**
+ * @return
+ */
public List<Integer> getQualitiesOfService() {
return qualitiesOfService;
}
+ /**
+ * @param qualitiesOfService The qualities of service to use for multiple topic subscriptions.
+ * If specified, the list must contain the same number of elements as {@link #topics}.
+ */
public void setQualitiesOfService(List<Integer> qualitiesOfService) {
this.qualitiesOfService = qualitiesOfService;
}
+ /**
+ * @return
+ */
public String getClientId() {
return clientId;
}
+ /**
+ * @param clientId The MQTT client ID (optional). If one is not provided, we'll create one for you and maintain
+ * it througout any reconnection attempts.
+ */
public void setClientId(String clientId) {
this.clientId = clientId;
}
+ /**
+ * @return
+ */
public MqttClientPersistence getPersistence() {
return persistence;
}
+ /**
+ * @param persistence A configurable persistence mechanism. If not set, Paho will use its default.
+ */
public void setPersistence(MqttClientPersistence persistence) {
this.persistence = persistence;
}
+ /**
+ * @return
+ */
public MqttConnectOptions getConnectOptions() {
return connectOptions;
}
+ /**
+ * @param connectOptions The MQTT client connect options, where users can configured the last will and testament, durability, etc.
+ */
public void setConnectOptions(MqttConnectOptions connectOptions) {
this.connectOptions = connectOptions;
}
+ /**
+ * @return
+ */
public boolean isDisconnectForcibly() {
return disconnectForcibly;
}
+ /**
+ * @param disconnectForcibly Whether to disconnect forcibly or not. By default, it's false.
+ */
public void setDisconnectForcibly(boolean disconnectForcibly) {
this.disconnectForcibly = disconnectForcibly;
}
+ /**
+ * @return
+ */
public Integer getDisconnectQuiesceTimeout() {
return disconnectQuiesceTimeout;
}
+ /**
+ * @param disconnectQuiesceTimeout Quiesce timeout on disconnection. If not provided, this streamer won't use any.
+ */
public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
}
+ /**
+ * @return
+ */
public Integer getDisconnectForciblyTimeout() {
return disconnectForciblyTimeout;
}
+ /**
+ * @param disconnectForciblyTimeout If disconnecting forcibly, the timeout. Compulsory in that case.
+ */
public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
this.disconnectForciblyTimeout = disconnectForciblyTimeout;
}
+ /**
+ * @return
+ */
public WaitStrategy getRetryWaitStrategy() {
return retryWaitStrategy;
}
+ /**
+ * @param retryWaitStrategy The strategy to determine how long to wait between retry attempts.
+ * By default, this streamer uses a Fibonacci-based strategy.
+ */
public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
this.retryWaitStrategy = retryWaitStrategy;
}
+ /**
+ * @return
+ */
public StopStrategy getRetryStopStrategy() {
return retryStopStrategy;
}
+ /**
+ * @param retryStopStrategy The strategy to determine when to stop retrying to (re-)connect. By default, we never stop.
+ */
public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
this.retryStopStrategy = retryStopStrategy;
}
+ /**
+ * @return
+ */
public boolean isBlockUntilConnected() {
return blockUntilConnected;
}
+ /**
+ * @param blockUntilConnected Whether to block the start() method until connected for the first time. By default,
+ * false.
+ */
public void setBlockUntilConnected(boolean blockUntilConnected) {
this.blockUntilConnected = blockUntilConnected;
}
+ /**
+ * A utility class to help us with (re-)connecting to the MQTT broker. It uses a single-threaded executor to perform
+ * the (re-)connections.
+ */
private class MqttConnectionRetrier {
+ /** The guava-retrying retrier object. */
private final Retryer<Boolean> retrier;
+
+ /** Single-threaded pool. */
private ExecutorService executor = Executors.newSingleThreadExecutor();
+ /**
+ * Constructor.
+ * @param retrier The retryier object.
+ */
public MqttConnectionRetrier(Retryer<Boolean> retrier) {
this.retrier = retrier;
}
+ /**
+ * Method that is called by the streamer to ask us to (re-)connect.
+ */
public void connect() {
Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
@@ -460,6 +599,9 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
}
+ /**
+ * Stops this connection utility class by shutting down the thread pool.
+ */
public void stop() {
executor.shutdownNow();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/296dd6e7/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 012486a..5ac7339 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -65,24 +65,41 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
*/
public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+ /** The test data. */
private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+
+ /** Topic name for single topic tests. */
private static final String SINGLE_TOPIC_NAME = "abc";
+
+ /** Topic names for multiple topic tests. */
private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+ /** The AMQ broker with an MQTT interface. */
private BrokerService broker;
+
+ /** The MQTT client. */
private MqttClient client;
+
+ /** The broker URL. */
private String brokerUrl;
+
+ /** The broker port. **/
private int port;
+
+ /** The MQTT streamer currently under test. */
private MqttStreamer<Integer, String> streamer;
+
+ /** The UUID of the currently active remote listener. */
private UUID remoteListener;
+ /** The Ignite data streamer. */
+ private IgniteDataStreamer<Integer, String> dataStreamer;
+
static {
for (int i = 0; i < 100; i++)
TEST_DATA.put(i, "v" + i);
}
- private IgniteDataStreamer<Integer, String> dataStreamer;
-
/** Constructor. */
public IgniteMqttStreamerTest() {
super(true);
@@ -99,14 +116,17 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
// create the broker
broker = new BrokerService();
- broker.deleteAllMessages();
+ broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(false);
+ broker.setPersistenceAdapter(null);
+ broker.setPersistenceFactory(null);
PolicyMap policyMap = new PolicyMap();
PolicyEntry policy = new PolicyEntry();
policy.setQueuePrefetch(1);
broker.setDestinationPolicy(policyMap);
broker.getDestinationPolicy().setDefaultEntry(policy);
+ broker.setSchedulerSupport(false);
// add the MQTT transport connector to the broker
broker.addConnector("mqtt://localhost:" + port);
@@ -143,6 +163,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -162,6 +185,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(50);
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -185,6 +211,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
// configure streamer
streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -204,6 +233,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(50);
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
// configure streamer
streamer.setMultipleTupleExtractor(multipleTupleExtractor());
@@ -227,6 +259,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -265,6 +300,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(100);
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_Reconnect() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -306,6 +344,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertCacheEntriesLoaded(100);
}
+ /**
+ * @throws Exception
+ */
public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -339,6 +380,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -363,6 +407,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
}
+ /**
+ * @throws Exception
+ */
public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
// configure streamer
streamer.setSingleTupleExtractor(singleTupleExtractor());
@@ -379,6 +426,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
+ /**
+ * @throws Exception
+ */
private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
streamer.setIgnite(grid());
@@ -393,7 +443,10 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return streamer;
}
- public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+ /**
+ * @throws Exception
+ */
+ private void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
if (singleMessage) {
final List<StringBuilder> sbs = new ArrayList<>(topics.size());
// initialize String Builders for each topic
@@ -423,6 +476,9 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
}
}
+ /**
+ * @throws Exception
+ */
private CountDownLatch subscribeToPutEvents(int expect) {
Ignite ignite = grid();
@@ -439,14 +495,16 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
return latch;
}
+ /**
+ * @throws Exception
+ */
private void assertCacheEntriesLoaded(int count) {
// get the cache and check that the entries are present
IgniteCache<Integer, String> cache = grid().cache(null);
// for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
- for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+ for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count))
assertEquals(TEST_DATA.get(key), cache.get(key));
- }
// assert that the cache exactly the specified amount of elements
assertEquals(count, cache.size(CachePeekMode.ALL));
@@ -455,6 +513,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
}
+ /**
+ * Returns a {@link StreamSingleTupleExtractor} for testing.
+ *
+ * @throws Exception
+ */
public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
@@ -464,6 +527,11 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
};
}
+ /**
+ * Returns a {@link StreamMultipleTupleExtractor} for testing.
+ *
+ * @throws Exception
+ */
public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
@Override public Map<Integer, String> extract(MqttMessage msg) {
[15/22] ignite git commit: ignite-973 - fix
Posted by vo...@apache.org.
ignite-973 - fix
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0be45e3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0be45e3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0be45e3
Branch: refs/heads/ignite-1282
Commit: f0be45e309f9a594334209a251c069f9ba3db120
Parents: e51fb42
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:36:40 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:36:40 2015 +0300
----------------------------------------------------------------------
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 3 +++
.../internal/processors/query/h2/opt/GridH2Table.java | 10 +++++++++-
.../internal/processors/cache/CacheIndexStreamerTest.java | 4 ++--
3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 07c49a5..4a16284 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,6 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
* @throws IgniteCheckedException If failed.
*/
public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
+ if (peekValue(VAL_COL) != null)
+ return;
+
setValue(VAL_COL, desc.wrap(val, desc.valueType()));
notifyAll();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
index 66241b4..bf318b2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2Table.java
@@ -55,6 +55,8 @@ import org.h2.value.Value;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.internal.processors.query.h2.opt.GridH2AbstractKeyValueRow.VAL_COL;
+
/**
* H2 Table implementation.
*/
@@ -372,6 +374,12 @@ public class GridH2Table extends TableBase {
if (!del) {
GridH2Row old = pk.put(row); // Put to PK.
+ if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value on replace.
+ GridH2AbstractKeyValueRow kvOld = (GridH2AbstractKeyValueRow)old;
+
+ kvOld.onUnswap(kvOld.getValue(VAL_COL), true);
+ }
+
int len = idxs.size();
int i = 1;
@@ -399,7 +407,7 @@ public class GridH2Table extends TableBase {
GridH2Row old = pk.remove(row);
if (old instanceof GridH2AbstractKeyValueRow) { // Unswap value.
- Value v = row.getValue(GridH2AbstractKeyValueRow.VAL_COL);
+ Value v = row.getValue(VAL_COL);
if (v != null)
((GridH2AbstractKeyValueRow)old).onUnswap(v.getObject(), true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f0be45e3/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 25c3b81..23f4e91 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -31,7 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.spi.swapspace.file.FileSwapSpaceSpi;
+import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -52,7 +52,7 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
- cfg.setSwapSpaceSpi(new FileSwapSpaceSpi());
+ cfg.setSwapSpaceSpi(new GridTestSwapSpaceSpi());
return cfg;
}
[20/22] ignite git commit: Merge branch 'ignite-1.4'
Posted by vo...@apache.org.
Merge branch 'ignite-1.4'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0a41ae57
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0a41ae57
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0a41ae57
Branch: refs/heads/ignite-1282
Commit: 0a41ae57215e9d2d208d33f7a46653c4be43de9c
Parents: 88acd31 1942d75
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:53:41 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:53:41 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +--
.../configuration/CacheConfiguration.java | 15 +++
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheMapEntry.java | 51 ++++----
.../processors/cache/GridCacheProcessor.java | 10 +-
.../cache/GridCacheSwapEntryImpl.java | 31 ++++-
.../processors/cache/GridCacheSwapManager.java | 80 ++++++++-----
.../datastreamer/DataStreamerImpl.java | 2 -
.../IgniteCacheEntryListenerAbstractTest.java | 65 +++++++++-
.../IgniteCachePutRetryAbstractSelfTest.java | 33 ++++++
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
.../processors/query/h2/IgniteH2Indexing.java | 19 +--
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 54 ++++++---
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +-
.../query/h2/opt/GridH2RowDescriptor.java | 5 +
.../processors/query/h2/opt/GridH2Table.java | 10 +-
.../cache/CacheIndexStreamerTest.java | 37 ++++--
.../processors/cache/GridCacheSwapSelfTest.java | 4 +-
.../IgniteCacheWithIndexingTestSuite.java | 2 +
20 files changed, 448 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0a41ae57/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
[16/22] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.4' into ignite-1.4
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/39dace45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/39dace45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/39dace45
Branch: refs/heads/ignite-1282
Commit: 39dace45c81aef7cb913fcf4f98a7d71e34beebd
Parents: f0be45e a104087
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Sep 22 13:38:21 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Sep 22 13:38:21 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +--
.../processors/cache/GridCacheProcessor.java | 2 +-
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
4 files changed, 125 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
[19/22] ignite git commit: Added test.
Posted by vo...@apache.org.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1942d758
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1942d758
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1942d758
Branch: refs/heads/ignite-1282
Commit: 1942d75856ab6d317b743de71b53a29abf81316a
Parents: ca2bce0
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 17:36:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 17:36:18 2015 +0300
----------------------------------------------------------------------
.../IgniteCachePutRetryAbstractSelfTest.java | 33 ++++++++++++++++++++
1 file changed, 33 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1942d758/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
index 943caeb..76f12c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryAbstractSelfTest.java
@@ -168,6 +168,13 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
/**
* @throws Exception If failed.
*/
+ public void testGetAndPut() throws Exception {
+ checkRetry(Test.GET_AND_PUT, TestMemoryMode.HEAP, false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
public void testPutStoreEnabled() throws Exception {
checkRetry(Test.PUT, TestMemoryMode.HEAP, true);
}
@@ -275,6 +282,29 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
break;
}
+ case GET_AND_PUT: {
+ for (int i = 0; i < keysCnt; i++)
+ cache.put(i, 0);
+
+ while (System.currentTimeMillis() < stopTime) {
+ Integer expOld = iter;
+
+ Integer val = ++iter;
+
+ for (int i = 0; i < keysCnt; i++) {
+ Integer old = cache.getAndPut(i, val);
+
+ assertTrue("Unexpected old value [old=" + old + ", exp=" + expOld + ']',
+ expOld.equals(old) || val.equals(old));
+ }
+
+ for (int i = 0; i < keysCnt; i++)
+ assertEquals(val, cache.get(i));
+ }
+
+ break;
+ }
+
case PUT_ALL: {
while (System.currentTimeMillis() < stopTime) {
Integer val = ++iter;
@@ -495,6 +525,9 @@ public abstract class IgniteCachePutRetryAbstractSelfTest extends GridCommonAbst
PUT,
/** */
+ GET_AND_PUT,
+
+ /** */
PUT_ALL,
/** */
[11/22] ignite git commit: IGNITE-1522 - Made cache entry listener
configurations transient in cache configuration
Posted by vo...@apache.org.
IGNITE-1522 - Made cache entry listener configurations transient in cache configuration
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e51fb420
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e51fb420
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e51fb420
Branch: refs/heads/ignite-1282
Commit: e51fb420d1284465c7cbe55a28c2374ddf67d495
Parents: 621eb0f
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Mon Sep 21 23:29:20 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Mon Sep 21 23:29:20 2015 -0700
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 15 +++++
.../IgniteCacheEntryListenerAbstractTest.java | 65 +++++++++++++++++++-
2 files changed, 79 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 7d1e14d..44a3fa9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashSet;
import javax.cache.Cache;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.Factory;
@@ -1799,6 +1800,20 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> {
return this;
}
+ /**
+ * Creates a copy of current configuration and removes all cache entry listeners.
+ * They are executed only locally and should never be sent to remote nodes.
+ *
+ * @return Configuration object that will be serialized.
+ */
+ protected Object writeReplace() {
+ CacheConfiguration<K, V> cfg = new CacheConfiguration<>(this);
+
+ cfg.listenerConfigurations = new HashSet<>();
+
+ return cfg;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/e51fb420/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index 78a6700..3fdd7fc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -17,6 +17,10 @@
package org.apache.ignite.internal.processors.cache;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -32,11 +36,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.configuration.CacheEntryListenerConfiguration;
import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryExpiredListener;
import javax.cache.event.CacheEntryListener;
+import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryRemovedListener;
import javax.cache.event.CacheEntryUpdatedListener;
import javax.cache.event.EventType;
@@ -358,6 +364,34 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
/**
+ * @throws Exception If failed.
+ */
+ public void testSerialization() throws Exception {
+ if (cacheMode() == LOCAL)
+ return;
+
+ AtomicBoolean serialized = new AtomicBoolean();
+
+ NonSerializableListener lsnr = new NonSerializableListener(serialized);
+
+ jcache(0).registerCacheEntryListener(new MutableCacheEntryListenerConfiguration<>(
+ FactoryBuilder.factoryOf(lsnr),
+ null,
+ true,
+ false
+ ));
+
+ try {
+ startGrid(gridCount());
+ }
+ finally {
+ stopGrid(gridCount());
+ }
+
+ assertFalse(serialized.get());
+ }
+
+ /**
* @param key Key.
* @param val Value.
* @param cache Cache.
@@ -1190,4 +1224,33 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb
}
}
-}
\ No newline at end of file
+ /**
+ */
+ public static class NonSerializableListener implements CacheEntryCreatedListener<Object, Object>, Externalizable {
+ /** */
+ private final AtomicBoolean serialized;
+
+ /**
+ * @param serialized Serialized flag.
+ */
+ public NonSerializableListener(AtomicBoolean serialized) {
+ this.serialized = serialized;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCreated(Iterable<CacheEntryEvent<? extends Object, ? extends Object>> evts)
+ throws CacheEntryListenerException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ serialized.set(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ // No-op.
+ }
+ }
+}
[07/22] ignite git commit: Merge branch 'master' into
feature/ignite-535-mqtt
Posted by vo...@apache.org.
Merge branch 'master' into feature/ignite-535-mqtt
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f03f3a3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f03f3a3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f03f3a3b
Branch: refs/heads/ignite-1282
Commit: f03f3a3b48fa105f318e9493440671188770f4ef
Parents: 53683e2 421a523
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:36:53 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:36:53 2015 +0100
----------------------------------------------------------------------
.../org/apache/ignite/IgniteAtomicLong.java | 15 +-
.../apache/ignite/IgniteAtomicReference.java | 9 +-
.../org/apache/ignite/IgniteAtomicSequence.java | 9 +-
.../org/apache/ignite/IgniteAtomicStamped.java | 13 +-
.../configuration/NearCacheConfiguration.java | 18 +-
.../apache/ignite/internal/IgniteKernal.java | 7 -
.../processors/cache/GridCacheContext.java | 6 +-
.../cache/GridCacheEvictionManager.java | 6 +-
.../cache/GridCacheEvictionResponse.java | 2 +-
.../processors/cache/GridCacheIoManager.java | 47 ++--
.../processors/cache/GridCacheMessage.java | 7 +
.../processors/cache/GridCacheMvccManager.java | 34 ++-
.../GridCachePartitionExchangeManager.java | 41 +++-
.../processors/cache/GridCacheProcessor.java | 28 ++-
.../GridDistributedLockResponse.java | 6 +-
.../GridDistributedTxPrepareResponse.java | 6 +-
.../distributed/dht/GridDhtLocalPartition.java | 26 +-
.../distributed/dht/GridDhtTopologyFuture.java | 6 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 2 +-
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 12 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 16 +-
.../dht/atomic/GridNearAtomicUpdateRequest.java | 2 +
.../atomic/GridNearAtomicUpdateResponse.java | 11 +-
.../colocated/GridDhtColocatedLockFuture.java | 44 +++-
.../dht/preloader/GridDhtForceKeysFuture.java | 2 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 6 +-
.../GridDhtPartitionsExchangeFuture.java | 19 +-
.../distributed/near/GridNearGetResponse.java | 6 +-
.../distributed/near/GridNearLockFuture.java | 26 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 20 +-
.../near/GridNearTxFinishResponse.java | 6 +-
.../query/GridCacheDistributedQueryFuture.java | 27 +-
.../cache/query/GridCacheLocalQueryFuture.java | 5 +
.../cache/query/GridCacheQueryAdapter.java | 170 ++++++++-----
.../query/GridCacheQueryFutureAdapter.java | 11 +-
.../cache/query/GridCacheQueryManager.java | 30 ++-
.../cache/query/GridCacheQueryResponse.java | 6 +-
.../continuous/CacheContinuousQueryHandler.java | 12 +-
.../transactions/IgniteTxLocalAdapter.java | 4 +-
.../ignite/internal/util/GridSpinBusyLock.java | 10 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 12 +-
.../IgniteCacheEntryProcessorNodeJoinTest.java | 24 +-
.../distributed/CacheAffEarlySelfTest.java | 245 +++++++++++++++++++
.../GridCacheSwapScanQueryAbstractSelfTest.java | 118 ++++-----
.../processors/igfs/IgfsAbstractSelfTest.java | 5 +-
.../loadtests/hashmap/GridCacheTestContext.java | 4 +-
.../ignite/testframework/GridTestUtils.java | 14 +-
.../cache/CacheIndexStreamerTest.java | 137 +++++++++++
...CacheScanPartitionQueryFallbackSelfTest.java | 244 +++++-------------
.../IgniteCacheQueryNodeRestartSelfTest2.java | 2 -
.../Apache.Ignite.Core/Impl/IgniteManager.cs | 2 -
.../ignite/visor/commands/VisorConsole.scala | 37 ++-
.../config/benchmark-put-indexed-val.properties | 2 +-
modules/yardstick/config/ignite-base-config.xml | 2 +-
.../yardstick/IgniteBenchmarkArguments.java | 24 +-
.../org/apache/ignite/yardstick/IgniteNode.java | 12 +-
57 files changed, 1084 insertions(+), 535 deletions(-)
----------------------------------------------------------------------
[13/22] ignite git commit: Added test.
Posted by vo...@apache.org.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33fe30da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33fe30da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33fe30da
Branch: refs/heads/ignite-1282
Commit: 33fe30da620e4f08cee959104805f3527b597700
Parents: e51fb42
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:55:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:55:18 2015 +0300
----------------------------------------------------------------------
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
2 files changed, 119 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
index 66275b3..14417c1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java
@@ -21,12 +21,25 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingRequest;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+
/**
* Client-based discovery SPI test with failure detection timeout enabled.
*/
@@ -60,7 +73,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
/** {@inheritDoc} */
@Override protected TcpDiscoverySpi getDiscoverySpi() {
- return useTestSpi ? new TestTcpDiscoverySpi() : super.getDiscoverySpi();
+ return useTestSpi ? new TestTcpDiscoverySpi2() : super.getDiscoverySpi();
}
/**
@@ -117,16 +130,16 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
private void checkFailureThresholdWorkability() throws Exception {
useTestSpi = true;
- TestTcpDiscoverySpi firstSpi = null;
- TestTcpDiscoverySpi secondSpi = null;
+ TestTcpDiscoverySpi2 firstSpi = null;
+ TestTcpDiscoverySpi2 secondSpi = null;
try {
startServerNodes(2);
checkNodes(2, 0);
- firstSpi = (TestTcpDiscoverySpi)(G.ignite("server-0").configuration().getDiscoverySpi());
- secondSpi = (TestTcpDiscoverySpi)(G.ignite("server-1").configuration().getDiscoverySpi());
+ firstSpi = (TestTcpDiscoverySpi2)(G.ignite("server-0").configuration().getDiscoverySpi());
+ secondSpi = (TestTcpDiscoverySpi2)(G.ignite("server-1").configuration().getDiscoverySpi());
assert firstSpi.err == null;
@@ -157,9 +170,102 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov
}
/**
+ * Test tries to provoke scenario when client sends reconnect message before router failure detected.
+ *
+ * @throws Exception If failed.
+ */
+ public void _testClientReconnectOnCoordinatorRouterFail() throws Exception {
+ startServerNodes(1);
+
+ Ignite srv = G.ignite("server-0");
+
+ final TcpDiscoveryNode srvNode = (TcpDiscoveryNode)srv.cluster().localNode();
+
+ final UUID srvNodeId = srvNode.id();
+
+ clientIpFinder = new TcpDiscoveryVmIpFinder();
+
+ clientIpFinder.setAddresses(
+ Collections.singleton("localhost:" + srvNode.discoveryPort() + ".." + (srvNode.discoveryPort() + 1)));
+
+ failureThreshold = 1000L;
+ netTimeout = 500L;
+
+ startClientNodes(1); // Client should connect to coordinator.
+
+ failureThreshold = 10_000L;
+ netTimeout = 5000L;
+
+ for (int i = 0; i < 2; i++) {
+ Ignite g = startGrid("server-" + srvIdx.getAndIncrement());
+
+ srvNodeIds.add(g.cluster().localNode().id());
+ }
+
+ checkNodes(3, 1);
+
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ String nodes[] = {"server-1", "server-2", "client-0"};
+
+ final AtomicBoolean err = new AtomicBoolean();
+
+ for (String node : nodes) {
+ G.ignite(node).events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ DiscoveryEvent disoEvt = (DiscoveryEvent)evt;
+
+ if (disoEvt.eventNode().id().equals(srvNodeId)) {
+ info("Expected node failed event: " + ((DiscoveryEvent) evt).eventNode());
+
+ latch.countDown();
+ }
+ else {
+ log.info("Unexpected node failed event: " + evt);
+
+ err.set(true);
+ }
+
+ return true;
+ }
+ }, EVT_NODE_FAILED);
+ }
+
+ Thread.sleep(5000);
+
+ Ignite client = G.ignite("client-0");
+
+ UUID nodeId = client.cluster().localNode().id();
+
+ log.info("Fail coordinator: " + srvNodeId);
+
+ TestTcpDiscoverySpi srvSpi = (TestTcpDiscoverySpi)srv.configuration().getDiscoverySpi();
+
+ srvSpi.pauseAll(false);
+
+ try {
+ Thread.sleep(2000);
+ }
+ finally {
+ srvSpi.simulateNodeFailure();
+ srvSpi.resumeAll();
+ }
+
+ try {
+ assertTrue(latch.await(10_000, TimeUnit.MILLISECONDS));
+
+ assertFalse("Unexpected event, see log for details.", err.get());
+ assertEquals(nodeId, client.cluster().localNode().id());
+ }
+ finally {
+ srvSpi.resumeAll();
+ }
+ }
+
+ /**
*
*/
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ private static class TestTcpDiscoverySpi2 extends TcpDiscoverySpi {
/** */
private long readDelay;
http://git-wip-us.apache.org/repos/asf/ignite/blob/33fe30da/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index c86f06a..9fbf5b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -89,13 +89,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
/** */
- private static final AtomicInteger srvIdx = new AtomicInteger();
+ protected static final AtomicInteger srvIdx = new AtomicInteger();
/** */
private static final AtomicInteger clientIdx = new AtomicInteger();
/** */
- private static Collection<UUID> srvNodeIds;
+ protected static Collection<UUID> srvNodeIds;
/** */
private static Collection<UUID> clientNodeIds;
@@ -128,13 +128,13 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
private UUID nodeId;
/** */
- private TcpDiscoveryVmIpFinder clientIpFinder;
+ protected TcpDiscoveryVmIpFinder clientIpFinder;
/** */
private long joinTimeout = TcpDiscoverySpi.DFLT_JOIN_TIMEOUT;
/** */
- private long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
+ protected long netTimeout = TcpDiscoverySpi.DFLT_NETWORK_TIMEOUT;
/** */
private boolean longSockTimeouts;
@@ -466,7 +466,8 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
@Override public void apply(Socket sock) {
try {
latch.await();
- } catch (InterruptedException e) {
+ }
+ catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@@ -2056,7 +2057,7 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
*
*/
- private static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
+ protected static class TestTcpDiscoverySpi extends TcpDiscoverySpi {
/** */
private final Object mux = new Object();
[05/22] ignite git commit: IGNITE-535 (WIP) MQTT Streamer.
Posted by vo...@apache.org.
IGNITE-535 (WIP) MQTT Streamer.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/53683e20
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/53683e20
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/53683e20
Branch: refs/heads/ignite-1282
Commit: 53683e20d304dfd96d544f286e8460d3829598d8
Parents: 6b53f1b
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:23:28 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:23:28 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 39 +-
.../apache/ignite/stream/mqtt/MqttStreamer.java | 336 +++++++++++---
.../stream/mqtt/IgniteMqttStreamerTest.java | 435 +++++++++++++++++++
.../ignite/stream/mqtt/TestTupleExtractors.java | 28 --
4 files changed, 741 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/modules/mqtt/pom.xml b/modules/mqtt/pom.xml
index b108180..4b0b46c 100644
--- a/modules/mqtt/pom.xml
+++ b/modules/mqtt/pom.xml
@@ -37,7 +37,8 @@
<properties>
<paho.version>1.0.2</paho.version>
- <mosquette.version>0.7</mosquette.version>
+ <activemq.version>5.11.1</activemq.version>
+ <guava-retryier.version>2.0.0</guava-retryier.version>
</properties>
<dependencies>
@@ -54,9 +55,29 @@
</dependency>
<dependency>
- <groupId>org.eclipse.moquette</groupId>
- <artifactId>moquette-broker</artifactId>
- <version>${mosquette.version}</version>
+ <groupId>com.github.rholder</groupId>
+ <artifactId>guava-retrying</artifactId>
+ <version>${guava-retryier.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <version>${activemq.version}</version>
<scope>test</scope>
</dependency>
@@ -86,16 +107,6 @@
<!-- Repository for Mosquette (embedded MQTT broker for tests) and for Eclipse Paho (MQTT client) -->
<repositories>
<repository>
- <id>bintray</id>
- <url>http://dl.bintray.com/andsel/maven/</url>
- <releases>
- <enabled>true</enabled>
- </releases>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
- <repository>
<id>Eclipse Paho Repo</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
<releases>
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
index 00a89ab..b86d385 100644
--- a/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
+++ b/modules/mqtt/src/main/java/org/apache/ignite/stream/mqtt/MqttStreamer.java
@@ -20,6 +20,10 @@ package org.apache.ignite.stream.mqtt;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
@@ -29,12 +33,19 @@ import org.apache.ignite.stream.StreamAdapter;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
import org.apache.ignite.stream.StreamSingleTupleExtractor;
+import com.github.rholder.retry.Retryer;
+import com.github.rholder.retry.RetryerBuilder;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.StopStrategy;
+import com.github.rholder.retry.WaitStrategies;
+import com.github.rholder.retry.WaitStrategy;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
@@ -90,8 +101,20 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
private Integer disconnectForciblyTimeout;
+ private WaitStrategy retryWaitStrategy = WaitStrategies.fibonacciWait();
+
+ private StopStrategy retryStopStrategy = StopStrategies.neverStop();
+
+ private MqttConnectionRetrier connectionRetrier;
+
private volatile boolean stopped = true;
+ private volatile boolean connected;
+
+ private String cachedLogPrefix;
+
+ private boolean blockUntilConnected;
+
/**
* Starts streamer.
*
@@ -109,18 +132,21 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
// parameter validations
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
- A.ensure(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null, "tuple extractor missing");
+ A.ensure(!(getSingleTupleExtractor() == null && getMultipleTupleExtractor() == null), "tuple extractor missing");
A.ensure(getSingleTupleExtractor() == null || getMultipleTupleExtractor() == null, "cannot provide " +
"both single and multiple tuple extractor");
A.notNullOrEmpty(brokerUrl, "broker URL");
A.notNullOrEmpty(clientId, "client ID");
- // if we have both a single topic and a list of topics, fail
- if (topic != null && topic.length() > 0 && !topics.isEmpty())
+ // if we have both a single topic and a list of topics (but the list of topic is not of
+ // size 1 and == topic, as this would be a case of re-initialization), fail
+ if (topic != null && topic.length() > 0 && !topics.isEmpty() &&
+ topics.size() != 1 && !topics.get(0).equals(topic))
throw new IllegalArgumentException("Cannot specify both a single topic and a list at the same time");
- // if we have both a single QoS and list, fail
- if (qualityOfService != null && !qualitiesOfService.isEmpty()) {
+ // same as above but for QoS
+ if (qualityOfService != null && !qualitiesOfService.isEmpty() && qualitiesOfService.size() != 1 &&
+ !qualitiesOfService.get(0).equals(qualityOfService)) {
throw new IllegalArgumentException("Cannot specify both a single QoS and a list at the same time");
}
@@ -131,12 +157,22 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
// if we have multiple topics
- if (topics != null && !topics.isEmpty()) {
- for (String t : topics) {
+ if (!topics.isEmpty()) {
+ for (String t : topics)
A.notNullOrEmpty(t, "topic in list of topics");
- }
+
A.ensure(qualitiesOfService.isEmpty() || qualitiesOfService.size() == topics.size(), "qualities of " +
"service must be either empty or have the same size as topics list");
+
+ cachedLogPrefix = "[" + Joiner.on(",").join(topics) + "]";
+ }
+ else { // just the single topic
+ topics.add(topic);
+
+ if (qualityOfService != null)
+ qualitiesOfService.add(qualityOfService);
+
+ cachedLogPrefix = "[" + topic + "]";
}
// create logger
@@ -148,10 +184,28 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
else
client = new MqttClient(brokerUrl, clientId, persistence);
- connectAndSubscribe();
+ // set this as a callback
+ client.setCallback(this);
+ // set stopped to false, as the connection will start async
stopped = false;
+ // build retrier
+ Retryer<Boolean> retrier = RetryerBuilder.<Boolean>newBuilder()
+ .retryIfResult(new Predicate<Boolean>() {
+ @Override public boolean apply(Boolean connected) {
+ return !connected;
+ }
+ })
+ .retryIfException().retryIfRuntimeException()
+ .withWaitStrategy(retryWaitStrategy)
+ .withStopStrategy(retryStopStrategy)
+ .build();
+
+ // create the connection retrier
+ connectionRetrier = new MqttConnectionRetrier(retrier);
+ connectionRetrier.connect();
+
}
catch (Throwable t) {
throw new IgniteException("Exception while initializing MqttStreamer", t);
@@ -159,34 +213,6 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
}
- private void connectAndSubscribe() throws MqttException {
- // connect
- if (connectOptions != null)
- client.connect();
- else
- client.connect(connectOptions);
-
- // subscribe to multiple topics
- if (!topics.isEmpty()) {
- if (qualitiesOfService.isEmpty()) {
- client.subscribe(topics.toArray(new String[0]));
- } else {
- int[] qoses = new int[qualitiesOfService.size()];
- for (int i = 0; i < qualitiesOfService.size(); i++)
- qoses[i] = qualitiesOfService.get(i);
-
- client.subscribe(topics.toArray(new String[0]), qoses);
- }
- } else {
- // subscribe to a single topic
- if (qualityOfService == null) {
- client.subscribe(topic);
- } else {
- client.subscribe(topic, qualityOfService);
- }
- }
- }
-
/**
* Stops streamer.
*/
@@ -194,50 +220,250 @@ public class MqttStreamer<K, V> extends StreamAdapter<MqttMessage, K, V> impleme
if (stopped)
throw new IgniteException("Attempted to stop an already stopped MQTT Streamer");
+ // stop the retrier
+ connectionRetrier.stop();
+
try {
if (disconnectForcibly) {
- if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null) {
+ if (disconnectQuiesceTimeout == null && disconnectForciblyTimeout == null)
client.disconnectForcibly();
- } else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null) {
+
+ else if (disconnectForciblyTimeout != null && disconnectQuiesceTimeout == null)
client.disconnectForcibly(disconnectForciblyTimeout);
- } else {
+
+ else
client.disconnectForcibly(disconnectQuiesceTimeout, disconnectForciblyTimeout);
- }
+
} else {
- if (disconnectQuiesceTimeout == null) {
+ if (disconnectQuiesceTimeout == null)
client.disconnect();
- } else {
+
+ else
client.disconnect(disconnectQuiesceTimeout);
- }
+
}
+
+ client.close();
+ connected = false;
+ stopped = true;
+
}
catch (Throwable t) {
throw new IgniteException("Exception while stopping MqttStreamer", t);
}
}
+ // -------------------------------
+ // MQTT Client callback methods
+ // -------------------------------
+
@Override public void connectionLost(Throwable throwable) {
- log.warning(String.format("MQTT Connection to server %s was lost due to", brokerUrl), throwable);
- // TODO: handle reconnect attempts with an optional backoff mechanism (linear, exponential, finonacci)
- try {
- connectAndSubscribe();
- }
- catch (MqttException e) {
- e.printStackTrace();
- }
+ connected = false;
+
+ // if we have been stopped, we do not try to establish the connection again
+ if (stopped)
+ return;
+
+ log.warning(String.format("MQTT Connection to server %s was lost.", brokerUrl), throwable);
+ connectionRetrier.connect();
}
@Override public void messageArrived(String topic, MqttMessage message) throws Exception {
if (getMultipleTupleExtractor() != null) {
Map<K, V> entries = getMultipleTupleExtractor().extract(message);
+ if (log.isTraceEnabled()) {
+ log.trace("Adding cache entries: " + entries);
+ }
getStreamer().addData(entries);
- } else {
+ }
+ else {
Map.Entry<K, V> entry = getSingleTupleExtractor().extract(message);
+ if (log.isTraceEnabled()) {
+ log.trace("Adding cache entry: " + entry);
+ }
getStreamer().addData(entry);
}
}
@Override public void deliveryComplete(IMqttDeliveryToken token) {
- // ignore, we don't send messages
+ // ignore, as we don't send messages
+ }
+
+ // -------------------------------
+ // Getters and setters
+ // -------------------------------
+
+ public String getBrokerUrl() {
+ return brokerUrl;
+ }
+
+ public void setBrokerUrl(String brokerUrl) {
+ this.brokerUrl = brokerUrl;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public Integer getQualityOfService() {
+ return qualityOfService;
+ }
+
+ public void setQualityOfService(Integer qualityOfService) {
+ this.qualityOfService = qualityOfService;
+ }
+
+ public List<String> getTopics() {
+ return topics;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
+
+ public List<Integer> getQualitiesOfService() {
+ return qualitiesOfService;
+ }
+
+ public void setQualitiesOfService(List<Integer> qualitiesOfService) {
+ this.qualitiesOfService = qualitiesOfService;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public MqttClientPersistence getPersistence() {
+ return persistence;
+ }
+
+ public void setPersistence(MqttClientPersistence persistence) {
+ this.persistence = persistence;
+ }
+
+ public MqttConnectOptions getConnectOptions() {
+ return connectOptions;
+ }
+
+ public void setConnectOptions(MqttConnectOptions connectOptions) {
+ this.connectOptions = connectOptions;
+ }
+
+ public boolean isDisconnectForcibly() {
+ return disconnectForcibly;
+ }
+
+ public void setDisconnectForcibly(boolean disconnectForcibly) {
+ this.disconnectForcibly = disconnectForcibly;
+ }
+
+ public Integer getDisconnectQuiesceTimeout() {
+ return disconnectQuiesceTimeout;
+ }
+
+ public void setDisconnectQuiesceTimeout(Integer disconnectQuiesceTimeout) {
+ this.disconnectQuiesceTimeout = disconnectQuiesceTimeout;
+ }
+
+ public Integer getDisconnectForciblyTimeout() {
+ return disconnectForciblyTimeout;
+ }
+
+ public void setDisconnectForciblyTimeout(Integer disconnectForciblyTimeout) {
+ this.disconnectForciblyTimeout = disconnectForciblyTimeout;
}
+
+ public WaitStrategy getRetryWaitStrategy() {
+ return retryWaitStrategy;
+ }
+
+ public void setRetryWaitStrategy(WaitStrategy retryWaitStrategy) {
+ this.retryWaitStrategy = retryWaitStrategy;
+ }
+
+ public StopStrategy getRetryStopStrategy() {
+ return retryStopStrategy;
+ }
+
+ public void setRetryStopStrategy(StopStrategy retryStopStrategy) {
+ this.retryStopStrategy = retryStopStrategy;
+ }
+
+ public boolean isBlockUntilConnected() {
+ return blockUntilConnected;
+ }
+
+ public void setBlockUntilConnected(boolean blockUntilConnected) {
+ this.blockUntilConnected = blockUntilConnected;
+ }
+
+ private class MqttConnectionRetrier {
+
+ private final Retryer<Boolean> retrier;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ public MqttConnectionRetrier(Retryer<Boolean> retrier) {
+ this.retrier = retrier;
+ }
+
+ public void connect() {
+ Callable<Boolean> callable = retrier.wrap(new Callable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ // if we're already connected, return immediately
+ if (connected)
+ return true;
+
+ if (stopped)
+ return false;
+
+ // connect to broker
+ if (connectOptions == null)
+ client.connect();
+ else
+ client.connect(connectOptions);
+
+ // always use the multiple topics variant of the mqtt client; even if the user specified a single
+ // topic and/or QoS, the initialization code would have placed it inside the 1..n structures
+ if (qualitiesOfService.isEmpty())
+ client.subscribe(topics.toArray(new String[0]));
+
+ else {
+ int[] qoses = new int[qualitiesOfService.size()];
+ for (int i = 0; i < qualitiesOfService.size(); i++)
+ qoses[i] = qualitiesOfService.get(i);
+
+ client.subscribe(topics.toArray(new String[0]), qoses);
+ }
+
+ connected = true;
+ return connected;
+ }
+ });
+
+ Future<Boolean> result = executor.submit(callable);
+
+ if (blockUntilConnected) {
+ try {
+ result.get();
+ }
+ catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void stop() {
+ executor.shutdownNow();
+ }
+
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
index 59730fa..012486a 100644
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
+++ b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/IgniteMqttStreamerTest.java
@@ -17,11 +17,47 @@
package org.apache.ignite.stream.mqtt;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.events.CacheEvent;
+import org.apache.ignite.internal.util.lang.GridMapEntry;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import com.github.rholder.retry.StopStrategies;
+import com.github.rholder.retry.WaitStrategies;
+import com.google.common.base.Splitter;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+
/**
* Test for {@link MqttStreamer}.
*
@@ -29,6 +65,24 @@ import org.junit.Before;
*/
public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
+ private static final Map<Integer, String> TEST_DATA = new HashMap<>();
+ private static final String SINGLE_TOPIC_NAME = "abc";
+ private static final List<String> MULTIPLE_TOPIC_NAMES = Arrays.asList("def", "ghi", "jkl", "mno");
+
+ private BrokerService broker;
+ private MqttClient client;
+ private String brokerUrl;
+ private int port;
+ private MqttStreamer<Integer, String> streamer;
+ private UUID remoteListener;
+
+ static {
+ for (int i = 0; i < 100; i++)
+ TEST_DATA.put(i, "v" + i);
+ }
+
+ private IgniteDataStreamer<Integer, String> dataStreamer;
+
/** Constructor. */
public IgniteMqttStreamerTest() {
super(true);
@@ -38,13 +92,394 @@ public class IgniteMqttStreamerTest extends GridCommonAbstractTest {
public void beforeTest() throws Exception {
grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+ // find an available local port
+ try (ServerSocket ss = new ServerSocket(0)) {
+ port = ss.getLocalPort();
+ }
+
+ // create the broker
+ broker = new BrokerService();
+ broker.deleteAllMessages();
+ broker.setPersistent(false);
+
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry policy = new PolicyEntry();
+ policy.setQueuePrefetch(1);
+ broker.setDestinationPolicy(policyMap);
+ broker.getDestinationPolicy().setDefaultEntry(policy);
+
+ // add the MQTT transport connector to the broker
+ broker.addConnector("mqtt://localhost:" + port);
+ broker.setStartAsync(false);
+ broker.start(true);
+
+ // create the broker URL
+ brokerUrl = "tcp://localhost:" + port;
+
+ // create the client and connect
+ client = new MqttClient(brokerUrl, UUID.randomUUID().toString(), new MemoryPersistence());
+ client.connect();
+
+ // create mqtt streamer
+ dataStreamer = grid().dataStreamer(null);
+ streamer = createMqttStreamer(dataStreamer);
}
@After
public void afterTest() throws Exception {
+ try {
+ streamer.stop();
+ }
+ catch (Exception e) {
+ // ignore if already stopped
+ }
+
+ dataStreamer.close();
+
grid().cache(null).clear();
+ broker.stop();
+ broker.deleteAllMessages();
+
+ }
+
+ public void testSingleTopic_NoQoS_OneEntryPerMessage() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ public void testMultipleTopics_NoQoS_OneEntryPerMessage() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+ }
+
+ public void testSingleTopic_NoQoS_MultipleEntriesOneMessage() throws Exception {
+ // configure streamer
+ streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, true);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+ }
+
+ public void testMultipleTopics_NoQoS_MultipleEntriesOneMessage() throws Exception {
+ // configure streamer
+ streamer.setMultipleTupleExtractor(multipleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, true);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+ }
+
+ public void testSingleTopic_NoQoS_ConnectOptions_Durable() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ MqttConnectOptions connOptions = new MqttConnectOptions();
+ connOptions.setCleanSession(false);
+ streamer.setConnectOptions(connOptions);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ // explicitly stop the streamer
+ streamer.stop();
+
+ // send messages while stopped
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+ latch = subscribeToPutEvents(50);
+
+ // start the streamer again
+ streamer.start();
+
+ // assertions - make sure that messages sent during disconnection were also received
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(100);
+ }
+
+ public void testSingleTopic_NoQoS_Reconnect() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+ streamer.setRetryStopStrategy(StopStrategies.neverStop());
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ // now shutdown the broker, wait 2 seconds and start it again
+ broker.stop();
+ broker.start(true);
+ broker.waitUntilStarted();
+ Thread.sleep(2000);
+ client.connect();
+
+ // let's ensure we have 2 connections: Ignite and our test
+ assertEquals(2, broker.getTransportConnectorByScheme("mqtt").getConnections().size());
+
+ // subscribe to cache PUT events again
+ latch = subscribeToPutEvents(50);
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(100);
+ }
+
+ public void testSingleTopic_NoQoS_RetryOnce() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setRetryWaitStrategy(WaitStrategies.noWait());
+ streamer.setRetryStopStrategy(StopStrategies.stopAfterAttempt(1));
+ streamer.setTopic(SINGLE_TOPIC_NAME);
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ // now shutdown the broker, wait 2 seconds and start it again
+ broker.stop();
+ broker.start(true);
+ broker.waitUntilStarted();
+ client.connect();
+
+ // lets send messages and ensure they are not received, because our retrier desisted
+ sendMessages(Arrays.asList(SINGLE_TOPIC_NAME), 50, 50, false);
+ Thread.sleep(3000);
+ assertNull(grid().cache(null).get(50));
+
+ }
+
+ public void testMultipleTopics_MultipleQoS_OneEntryPerMessage() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+ streamer.setQualitiesOfService(Arrays.asList(1, 1, 1, 1));
+
+ // subscribe to cache PUT events
+ CountDownLatch latch = subscribeToPutEvents(50);
+
+ // action time
+ streamer.start();
+
+ // send messages
+ sendMessages(MULTIPLE_TOPIC_NAMES, 0, 50, false);
+
+ // assertions
+ assertTrue(latch.await(10, TimeUnit.SECONDS));
+ assertCacheEntriesLoaded(50);
+
+ assertTrue(broker.getBroker().getDestinationMap().size() >= 4);
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("def")));
+ assertTrue(broker.getBroker().getDestinationMap().containsKey(new ActiveMQTopic("ghi")));
+ }
+
+ public void testMultipleTopics_MultipleQoS_Mismatch() throws Exception {
+ // configure streamer
+ streamer.setSingleTupleExtractor(singleTupleExtractor());
+ streamer.setTopics(MULTIPLE_TOPIC_NAMES);
+ streamer.setQualitiesOfService(Arrays.asList(1, 1, 1));
+
+ try {
+ streamer.start();
+ }
+ catch (Exception e) {
+ return;
+ }
+ fail("Expected an exception reporting invalid parameters");
+
+ }
+
+ private MqttStreamer<Integer, String> createMqttStreamer(IgniteDataStreamer<Integer, String> dataStreamer) {
+ MqttStreamer<Integer, String> streamer = new MqttStreamer<>();
+ streamer.setIgnite(grid());
+ streamer.setStreamer(dataStreamer);
+ streamer.setBrokerUrl(brokerUrl);
+ streamer.setClientId(UUID.randomUUID().toString());
+ streamer.setBlockUntilConnected(true);
+
+ dataStreamer.allowOverwrite(true);
+ dataStreamer.autoFlushFrequency(1);
+
+ return streamer;
+ }
+
+ public void sendMessages(final List<String> topics, int fromIdx, int count, boolean singleMessage) throws MqttException {
+ if (singleMessage) {
+ final List<StringBuilder> sbs = new ArrayList<>(topics.size());
+ // initialize String Builders for each topic
+ F.forEach(topics, new IgniteInClosure<String>() {
+ @Override public void apply(String s) {
+ sbs.add(new StringBuilder());
+ }
+ });
+ // fill String Builders for each topic
+ F.forEach(F.range(fromIdx, fromIdx + count), new IgniteInClosure<Integer>() {
+ @Override public void apply(Integer integer) {
+ sbs.get(integer % topics.size()).append(integer.toString() + "," + TEST_DATA.get(integer) + "\n");
+ }
+ });
+ // send each buffer out
+ for (int i = 0; i < topics.size(); i++) {
+ MqttMessage msg = new MqttMessage(sbs.get(i).toString().getBytes());
+ client.publish(topics.get(i % topics.size()), msg);
+ }
+ }
+ else {
+ for (int i = fromIdx; i < fromIdx + count; i++) {
+ byte[] payload = (i + "," + TEST_DATA.get(i)).getBytes();
+ MqttMessage msg = new MqttMessage(payload);
+ client.publish(topics.get(i % topics.size()), msg);
+ }
+ }
+ }
+
+ private CountDownLatch subscribeToPutEvents(int expect) {
+ Ignite ignite = grid();
+
+ // Listen to cache PUT events and expect as many as messages as test data items
+ final CountDownLatch latch = new CountDownLatch(expect);
+ @SuppressWarnings("serial") IgniteBiPredicate<UUID, CacheEvent> callback = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+ return true;
+ }
+ };
+
+ remoteListener = ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(callback, null, EVT_CACHE_OBJECT_PUT);
+ return latch;
+ }
+
+ private void assertCacheEntriesLoaded(int count) {
+ // get the cache and check that the entries are present
+ IgniteCache<Integer, String> cache = grid().cache(null);
+
+ // for each key from 0 to count from the TEST_DATA (ordered by key), check that the entry is present in cache
+ for (Integer key : new ArrayList<>(new TreeSet<>(TEST_DATA.keySet())).subList(0, count)) {
+ assertEquals(TEST_DATA.get(key), cache.get(key));
+ }
+
+ // assert that the cache exactly the specified amount of elements
+ assertEquals(count, cache.size(CachePeekMode.ALL));
+
+ // remove the event listener
+ grid().events(grid().cluster().forCacheNodes(null)).stopRemoteListen(remoteListener);
+ }
+
+ public static StreamSingleTupleExtractor<MqttMessage, Integer, String> singleTupleExtractor() {
+ return new StreamSingleTupleExtractor<MqttMessage, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(MqttMessage msg) {
+ List<String> s = Splitter.on(",").splitToList(new String(msg.getPayload()));
+ return new GridMapEntry<>(Integer.parseInt(s.get(0)), s.get(1));
+ }
+ };
+ }
+ public static StreamMultipleTupleExtractor<MqttMessage, Integer, String> multipleTupleExtractor() {
+ return new StreamMultipleTupleExtractor<MqttMessage, Integer, String>() {
+ @Override public Map<Integer, String> extract(MqttMessage msg) {
+ final Map<String, String> map = Splitter.on("\n")
+ .omitEmptyStrings()
+ .withKeyValueSeparator(",")
+ .split(new String(msg.getPayload()));
+ final Map<Integer, String> answer = new HashMap<>();
+ F.forEach(map.keySet(), new IgniteInClosure<String>() {
+ @Override public void apply(String s) {
+ answer.put(Integer.parseInt(s), map.get(s));
+ }
+ });
+ return answer;
+ }
+ };
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/53683e20/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
----------------------------------------------------------------------
diff --git a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java b/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
deleted file mode 100644
index e2ed0f0..0000000
--- a/modules/mqtt/src/test/java/org/apache/ignite/stream/mqtt/TestTupleExtractors.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.mqtt;
-
-/**
- * Test transformers for MqttStreamer tests.
- *
- * @author Raul Kripalani
- */
-public class TestTupleExtractors {
-
-
-}
\ No newline at end of file
[22/22] ignite git commit: IGNITE-1282: Fix after merge from master.
Posted by vo...@apache.org.
IGNITE-1282: Fix after merge from master.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/220ecb30
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/220ecb30
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/220ecb30
Branch: refs/heads/ignite-1282
Commit: 220ecb3064e6714988d95950ec1670f6850811a1
Parents: 281b9f2
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:58:56 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:58:56 2015 +0300
----------------------------------------------------------------------
.../configuration/CacheConfiguration.java | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/220ecb30/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 44f7e1b..98f9fe1 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -17,17 +17,17 @@
package org.apache.ignite.configuration;
-import java.io.Serializable;
-import java.util.Collection;
-import javax.cache.Cache;
-import javax.cache.configuration.CompleteConfiguration;
-import javax.cache.configuration.Factory;
-import javax.cache.configuration.MutableConfiguration;
-import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.CacheTypeMetadata;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.eviction.EvictionFilter;
@@ -44,13 +44,13 @@ import org.apache.ignite.plugin.CachePluginConfiguration;
import org.jetbrains.annotations.Nullable;
import javax.cache.Cache;
-import javax.cache.CacheException;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.Factory;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.ExpiryPolicy;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashSet;
/**
* This class defines grid cache configuration. This configuration is passed to
[03/22] ignite git commit: Merge branch 'master' into
feature/ignite-1370.
Posted by vo...@apache.org.
Merge branch 'master' into feature/ignite-1370.
# Conflicts:
# modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b80b1719
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b80b1719
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b80b1719
Branch: refs/heads/ignite-1282
Commit: b80b17196269a036da65d781eb772a317039b014
Parents: b1dee96 6e48c9c
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 18:28:09 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 18:28:09 2015 +0100
----------------------------------------------------------------------
RELEASE_NOTES.txt | 2 +-
examples/config/example-default.xml | 76 -
examples/config/example-ignite.xml | 56 +-
examples/config/filesystem/README.txt | 2 +-
examples/config/filesystem/example-igfs.xml | 7 -
.../config/portable/example-ignite-portable.xml | 44 -
.../ignite/examples/portable/Address.java | 72 -
.../ignite/examples/portable/Employee.java | 93 -
.../ignite/examples/portable/EmployeeKey.java | 90 -
.../portable/ExamplePortableNodeStartup.java | 36 -
.../ignite/examples/portable/Organization.java | 93 -
.../examples/portable/OrganizationType.java | 32 -
...mputeClientPortableTaskExecutionExample.java | 154 -
.../portable/computegrid/ComputeClientTask.java | 116 -
.../portable/computegrid/package-info.java | 21 -
.../CacheClientPortablePutGetExample.java | 230 --
.../CacheClientPortableQueryExample.java | 325 --
.../portable/datagrid/package-info.java | 21 -
.../ignite/examples/portable/package-info.java | 21 -
.../CacheClientPortableExampleTest.java | 46 -
.../ComputeClientPortableExampleTest.java | 37 -
.../testsuites/IgniteExamplesSelfTestSuite.java | 6 -
modules/clients/pom.xml | 7 +
modules/clients/src/test/config/jdbc-config.xml | 55 +
.../jdbc2/JdbcComplexQuerySelfTest.java | 316 ++
.../internal/jdbc2/JdbcConnectionSelfTest.java | 268 ++
.../internal/jdbc2/JdbcEmptyCacheSelfTest.java | 140 +
.../internal/jdbc2/JdbcLocalCachesSelfTest.java | 156 +
.../internal/jdbc2/JdbcMetadataSelfTest.java | 334 ++
.../jdbc2/JdbcPreparedStatementSelfTest.java | 730 ++++
.../internal/jdbc2/JdbcResultSetSelfTest.java | 751 ++++
.../internal/jdbc2/JdbcStatementSelfTest.java | 292 ++
.../jdbc/suite/IgniteJdbcDriverTestSuite.java | 11 +
modules/core/pom.xml | 21 -
.../src/main/java/org/apache/ignite/Ignite.java | 7 -
.../java/org/apache/ignite/IgniteCache.java | 44 +-
.../org/apache/ignite/IgniteJdbcDriver.java | 281 +-
.../java/org/apache/ignite/IgnitePortables.java | 370 --
.../apache/ignite/IgniteSystemProperties.java | 5 +-
.../configuration/CacheConfiguration.java | 70 +-
.../ignite/internal/GridKernalContext.java | 7 +-
.../ignite/internal/GridKernalContextImpl.java | 10 +-
.../apache/ignite/internal/GridLoggerProxy.java | 6 +-
.../org/apache/ignite/internal/IgniteEx.java | 9 +
.../apache/ignite/internal/IgniteKernal.java | 14 +-
.../ignite/internal/IgniteNodeAttributes.java | 5 +-
.../internal/executor/GridExecutorService.java | 4 +-
.../ignite/internal/jdbc/JdbcConnection.java | 4 +
.../internal/jdbc/JdbcConnectionInfo.java | 91 -
.../internal/jdbc/JdbcDatabaseMetadata.java | 4 +
.../internal/jdbc/JdbcPreparedStatement.java | 4 +
.../ignite/internal/jdbc/JdbcResultSet.java | 4 +
.../internal/jdbc/JdbcResultSetMetadata.java | 4 +
.../ignite/internal/jdbc/JdbcStatement.java | 4 +
.../apache/ignite/internal/jdbc/JdbcUtils.java | 4 +
.../ignite/internal/jdbc2/JdbcConnection.java | 777 ++++
.../internal/jdbc2/JdbcDatabaseMetadata.java | 1401 +++++++
.../internal/jdbc2/JdbcPreparedStatement.java | 411 ++
.../ignite/internal/jdbc2/JdbcQueryTask.java | 361 ++
.../ignite/internal/jdbc2/JdbcResultSet.java | 1520 +++++++
.../internal/jdbc2/JdbcResultSetMetadata.java | 171 +
.../ignite/internal/jdbc2/JdbcStatement.java | 456 +++
.../apache/ignite/internal/jdbc2/JdbcUtils.java | 155 +
.../deployment/GridDeploymentStoreAdapter.java | 4 +-
.../discovery/GridDiscoveryManager.java | 10 -
.../portable/GridPortableMarshaller.java | 2 +-
.../portable/PortableClassDescriptor.java | 10 +-
.../internal/portable/PortableContext.java | 32 +-
.../portable/PortableMetaDataCollector.java | 6 +-
.../portable/PortableMetaDataHandler.java | 4 +-
.../internal/portable/PortableMetaDataImpl.java | 14 +-
.../internal/portable/PortableObjectEx.java | 6 +-
.../internal/portable/PortableObjectImpl.java | 6 +-
.../portable/PortableObjectOffheapImpl.java | 6 +-
.../internal/portable/PortableRawReaderEx.java | 4 +-
.../internal/portable/PortableRawWriterEx.java | 4 +-
.../portable/PortableReaderContext.java | 2 +-
.../internal/portable/PortableReaderExImpl.java | 10 +-
.../ignite/internal/portable/PortableUtils.java | 2 +-
.../internal/portable/PortableWriterExImpl.java | 11 +-
.../internal/portable/api/IgnitePortables.java | 362 ++
.../internal/portable/api/PortableBuilder.java | 136 +
.../portable/api/PortableException.java | 57 +
.../internal/portable/api/PortableIdMapper.java | 54 +
.../api/PortableInvalidClassException.java | 58 +
.../portable/api/PortableMarshalAware.java | 48 +
.../portable/api/PortableMarshaller.java | 358 ++
.../internal/portable/api/PortableMetadata.java | 60 +
.../internal/portable/api/PortableObject.java | 152 +
.../portable/api/PortableProtocolVersion.java | 41 +
.../portable/api/PortableRawReader.java | 234 ++
.../portable/api/PortableRawWriter.java | 219 +
.../internal/portable/api/PortableReader.java | 284 ++
.../portable/api/PortableSerializer.java | 47 +
.../portable/api/PortableTypeConfiguration.java | 195 +
.../internal/portable/api/PortableWriter.java | 266 ++
.../portable/builder/PortableBuilderEnum.java | 2 +-
.../portable/builder/PortableBuilderImpl.java | 14 +-
.../portable/builder/PortableBuilderReader.java | 2 +-
.../builder/PortableBuilderSerializer.java | 2 +-
.../builder/PortableEnumArrayLazyValue.java | 4 +-
.../builder/PortableObjectArrayLazyValue.java | 2 +-
.../builder/PortablePlainPortableObject.java | 2 +-
.../streams/PortableAbstractInputStream.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 31 +-
.../cache/GridCacheClearAllRunnable.java | 4 +-
.../processors/cache/GridCacheContext.java | 4 +-
.../processors/cache/GridCacheIoManager.java | 4 +-
.../processors/cache/GridCacheLogger.java | 4 +-
.../processors/cache/GridCacheMapEntry.java | 15 +-
.../processors/cache/GridCacheMvcc.java | 5 +-
.../processors/cache/GridCacheProcessor.java | 37 +-
.../cache/GridCacheSharedContext.java | 4 +-
.../processors/cache/GridCacheSwapManager.java | 46 +-
.../processors/cache/IgniteCacheProxy.java | 5 -
.../distributed/GridCacheTxRecoveryFuture.java | 11 +-
.../distributed/GridDistributedCacheEntry.java | 6 +-
.../GridDistributedTxFinishRequest.java | 13 +-
.../GridDistributedTxRemoteAdapter.java | 18 +-
.../dht/GridClientPartitionTopology.java | 104 +-
.../distributed/dht/GridDhtLocalPartition.java | 1 +
.../dht/GridDhtPartitionTopology.java | 4 +-
.../dht/GridDhtPartitionTopologyImpl.java | 16 +-
.../dht/GridDhtTransactionalCacheAdapter.java | 514 ++-
.../distributed/dht/GridDhtTxFinishFuture.java | 15 +-
.../distributed/dht/GridDhtTxFinishRequest.java | 84 +-
.../dht/GridDhtTxFinishResponse.java | 89 +-
.../cache/distributed/dht/GridDhtTxLocal.java | 4 +-
.../distributed/dht/GridDhtTxLocalAdapter.java | 67 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 34 +-
.../cache/distributed/dht/GridDhtTxRemote.java | 40 +-
.../dht/GridPartitionedGetFuture.java | 4 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +-
.../colocated/GridDhtColocatedLockFuture.java | 11 +-
.../distributed/near/GridNearLockFuture.java | 11 +-
.../distributed/near/GridNearLockRequest.java | 18 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 55 +-
.../GridNearPessimisticTxPrepareFuture.java | 11 +-
.../near/GridNearTxFinishFuture.java | 345 +-
.../near/GridNearTxFinishRequest.java | 20 +-
.../cache/distributed/near/GridNearTxLocal.java | 64 +-
.../distributed/near/GridNearTxRemote.java | 38 +-
.../CacheDefaultPortableAffinityKeyMapper.java | 2 +-
.../portable/CacheObjectPortableContext.java | 2 +-
.../portable/CacheObjectPortableProcessor.java | 8 +-
.../CacheObjectPortableProcessorImpl.java | 12 +-
.../cache/portable/IgnitePortablesImpl.java | 10 +-
.../cache/store/CacheOsStoreManager.java | 4 +-
.../cache/transactions/IgniteTxAdapter.java | 5 +-
.../cache/transactions/IgniteTxHandler.java | 286 +-
.../transactions/IgniteTxLocalAdapter.java | 37 +-
.../cache/transactions/IgniteTxManager.java | 48 +-
.../continuous/GridContinuousProcessor.java | 22 +-
.../datastructures/DataStructuresProcessor.java | 102 +-
.../datastructures/GridCacheAtomicLongImpl.java | 4 +-
.../GridCacheAtomicReferenceImpl.java | 4 +-
.../GridCacheAtomicSequenceImpl.java | 4 +-
.../GridCacheAtomicStampedImpl.java | 4 +-
.../GridCacheCountDownLatchImpl.java | 4 +-
.../GridTransactionalCacheQueueImpl.java | 15 +-
.../processors/igfs/IgfsFileAffinityRange.java | 4 +-
.../processors/igfs/IgfsFileWorkerBatch.java | 15 +-
.../igfs/IgfsFragmentizerManager.java | 8 +-
.../processors/igfs/IgfsServerManager.java | 5 +-
.../internal/processors/job/GridJobWorker.java | 4 +-
.../dotnet/PlatformDotNetConfiguration.java | 12 +-
.../PlatformDotNetPortableConfiguration.java | 12 +-
...PlatformDotNetPortableTypeConfiguration.java | 12 +-
.../processors/task/GridTaskWorker.java | 4 +-
.../util/GridSpiCloseableIteratorWrapper.java | 5 +
.../ignite/internal/util/IgniteUtils.java | 16 +
.../util/offheap/GridOffHeapEvictListener.java | 2 +-
.../util/offheap/GridOffHeapPartitionedMap.java | 1 +
.../util/offheap/unsafe/GridUnsafeMap.java | 4 +-
.../marshaller/portable/PortableMarshaller.java | 358 --
.../marshaller/portable/package-info.java | 22 -
.../apache/ignite/portable/PortableBuilder.java | 137 -
.../ignite/portable/PortableException.java | 57 -
.../ignite/portable/PortableIdMapper.java | 56 -
.../portable/PortableInvalidClassException.java | 58 -
.../ignite/portable/PortableMarshalAware.java | 48 -
.../ignite/portable/PortableMetadata.java | 61 -
.../apache/ignite/portable/PortableObject.java | 154 -
.../portable/PortableProtocolVersion.java | 41 -
.../ignite/portable/PortableRawReader.java | 234 --
.../ignite/portable/PortableRawWriter.java | 219 -
.../apache/ignite/portable/PortableReader.java | 284 --
.../ignite/portable/PortableSerializer.java | 49 -
.../portable/PortableTypeConfiguration.java | 196 -
.../apache/ignite/portable/PortableWriter.java | 266 --
.../apache/ignite/portable/package-info.java | 22 -
.../resources/META-INF/classnames.properties | 20 +-
.../GridDiscoveryManagerAttributesSelfTest.java | 45 -
.../GridPortableAffinityKeySelfTest.java | 218 -
.../GridPortableBuilderAdditionalSelfTest.java | 1226 ------
.../portable/GridPortableBuilderSelfTest.java | 1021 -----
...eBuilderStringAsCharsAdditionalSelfTest.java | 28 -
...ridPortableBuilderStringAsCharsSelfTest.java | 28 -
...idPortableMarshallerCtxDisabledSelfTest.java | 256 --
.../GridPortableMarshallerSelfTest.java | 3807 ------------------
.../GridPortableMetaDataDisabledSelfTest.java | 238 --
.../portable/GridPortableMetaDataSelfTest.java | 369 --
.../portable/GridPortableWildcardsSelfTest.java | 482 ---
.../GridPortableMarshalerAwareTestClass.java | 67 -
.../mutabletest/GridPortableTestClasses.java | 434 --
.../portable/mutabletest/package-info.java | 22 -
.../ignite/internal/portable/package-info.java | 22 -
.../portable/test/GridPortableTestClass1.java | 28 -
.../portable/test/GridPortableTestClass2.java | 24 -
.../internal/portable/test/package-info.java | 22 -
.../test/subpackage/GridPortableTestClass3.java | 24 -
.../portable/test/subpackage/package-info.java | 22 -
.../CacheStoreUsageMultinodeAbstractTest.java | 16 +-
.../cache/GridCacheAbstractFullApiSelfTest.java | 57 +-
.../GridCacheAbstractRemoveFailureTest.java | 199 +-
.../cache/GridCacheMemoryModeSelfTest.java | 9 +-
.../processors/cache/GridCacheMvccSelfTest.java | 4 +-
.../cache/GridCacheP2PUndeploySelfTest.java | 30 +-
.../cache/GridCachePutAllFailoverSelfTest.java | 88 +-
.../GridCacheVariableTopologySelfTest.java | 5 +-
.../cache/IgniteCacheCreateRestartSelfTest.java | 106 +
.../IgniteCacheEntryProcessorNodeJoinTest.java | 73 +
.../cache/IgniteCachePutAllRestartTest.java | 2 +
.../cache/IgniteInternalCacheTypesTest.java | 4 +-
.../cache/IgniteOnePhaseCommitNearSelfTest.java | 243 ++
.../IgniteTxExceptionAbstractSelfTest.java | 29 +-
.../GridCacheAbstractNodeRestartSelfTest.java | 149 +-
...ridCachePartitionNotLoadedEventSelfTest.java | 27 +-
.../GridCacheTransformEventSelfTest.java | 5 +-
.../GridCacheColocatedTxExceptionSelfTest.java | 2 +-
.../GridCacheDhtAtomicRemoveFailureTest.java | 16 +-
.../dht/GridCacheDhtRemoveFailureTest.java | 16 +-
.../dht/GridCacheTxNodeFailureSelfTest.java | 405 ++
.../dht/GridNearCacheTxNodeFailureSelfTest.java | 31 +
...gniteAtomicLongChangingTopologySelfTest.java | 278 ++
.../IgniteCacheCrossCacheTxFailoverTest.java | 53 +-
.../IgniteCachePutRetryAbstractSelfTest.java | 166 +-
.../dht/IgniteCachePutRetryAtomicSelfTest.java | 2 +
...gniteCachePutRetryTransactionalSelfTest.java | 50 +-
...eAtomicInvalidPartitionHandlingSelfTest.java | 98 +-
...tomicPrimaryWriteOrderRemoveFailureTest.java | 15 +-
.../GridCacheAtomicRemoveFailureTest.java | 15 +-
.../GridCacheAtomicNearRemoveFailureTest.java | 15 +-
...cPrimaryWriteOrderNearRemoveFailureTest.java | 15 +-
.../near/GridCacheNearRemoveFailureTest.java | 15 +-
.../near/GridCacheNearTxExceptionSelfTest.java | 2 +-
.../GridCachePartitionedNodeRestartTest.java | 9 +-
...ePartitionedOptimisticTxNodeRestartTest.java | 9 +-
.../near/IgniteCacheNearOnlyTxTest.java | 14 +-
.../GridCacheReplicatedNodeRestartSelfTest.java | 8 +-
.../GridCacheReplicatedTxExceptionSelfTest.java | 2 +-
.../GridCacheLocalTxExceptionSelfTest.java | 2 +-
...ClientNodePortableMetadataMultinodeTest.java | 295 --
...GridCacheClientNodePortableMetadataTest.java | 286 --
...ableObjectsAbstractDataStreamerSelfTest.java | 190 -
...bleObjectsAbstractMultiThreadedSelfTest.java | 231 --
...ridCachePortableObjectsAbstractSelfTest.java | 978 -----
.../GridCachePortableStoreAbstractSelfTest.java | 297 --
.../GridCachePortableStoreObjectsSelfTest.java | 55 -
...GridCachePortableStorePortablesSelfTest.java | 66 -
...ridPortableCacheEntryMemorySizeSelfTest.java | 55 -
...leDuplicateIndexObjectsAbstractSelfTest.java | 158 -
.../DataStreamProcessorPortableSelfTest.java | 66 -
.../GridDataStreamerImplSelfTest.java | 345 --
...ridCacheAffinityRoutingPortableSelfTest.java | 47 -
...lyPortableDataStreamerMultiNodeSelfTest.java | 29 -
...rtableDataStreamerMultithreadedSelfTest.java | 47 -
...artitionedOnlyPortableMultiNodeSelfTest.java | 28 -
...tionedOnlyPortableMultithreadedSelfTest.java | 47 -
.../GridCacheMemoryModePortableSelfTest.java | 36 -
...acheOffHeapTieredAtomicPortableSelfTest.java | 47 -
...eapTieredEvictionAtomicPortableSelfTest.java | 95 -
...heOffHeapTieredEvictionPortableSelfTest.java | 95 -
.../GridCacheOffHeapTieredPortableSelfTest.java | 47 -
...ateIndexObjectPartitionedAtomicSelfTest.java | 38 -
...xObjectPartitionedTransactionalSelfTest.java | 41 -
...AtomicNearDisabledOffheapTieredSelfTest.java | 29 -
...rtableObjectsAtomicNearDisabledSelfTest.java | 51 -
...tableObjectsAtomicOffheapTieredSelfTest.java | 29 -
.../GridCachePortableObjectsAtomicSelfTest.java | 51 -
...tionedNearDisabledOffheapTieredSelfTest.java | 30 -
...eObjectsPartitionedNearDisabledSelfTest.java | 51 -
...ObjectsPartitionedOffheapTieredSelfTest.java | 30 -
...CachePortableObjectsPartitionedSelfTest.java | 51 -
...sNearPartitionedByteArrayValuesSelfTest.java | 41 -
...sPartitionedOnlyByteArrayValuesSelfTest.java | 42 -
...dCachePortableObjectsReplicatedSelfTest.java | 51 -
...CachePortableObjectsAtomicLocalSelfTest.java | 32 -
...rtableObjectsLocalOffheapTieredSelfTest.java | 29 -
.../GridCachePortableObjectsLocalSelfTest.java | 51 -
.../processors/igfs/IgfsAbstractSelfTest.java | 61 +-
.../igfs/IgfsDualAbstractSelfTest.java | 10 +-
.../processors/igfs/IgfsStartCacheTest.java | 2 +-
.../offheap/GridOffHeapMapAbstractSelfTest.java | 6 +-
...idOffHeapPartitionedMapAbstractSelfTest.java | 10 +-
.../stream/socket/SocketStreamerSelfTest.java | 29 +-
.../ignite/testframework/GridTestUtils.java | 117 +
.../ignite/testframework/junits/IgniteMock.java | 8 +-
.../multijvm/IgniteCacheProcessProxy.java | 5 -
.../junits/multijvm/IgniteProcessProxy.java | 2 +-
.../IgniteCacheFailoverTestSuite.java | 15 +-
.../IgniteCacheFailoverTestSuite3.java | 62 +
.../testsuites/IgniteCacheRestartTestSuite.java | 15 +-
.../IgniteCacheRestartTestSuite2.java | 47 +
.../IgnitePortableCacheFullApiTestSuite.java | 37 -
.../IgnitePortableCacheTestSuite.java | 103 -
.../IgnitePortableObjectsTestSuite.java | 92 -
.../ignite/portable/test1/1.1/test1-1.1.jar | Bin 2548 -> 0 bytes
.../ignite/portable/test1/1.1/test1-1.1.pom | 9 -
.../portable/test1/maven-metadata-local.xml | 12 -
.../ignite/portable/test2/1.1/test2-1.1.jar | Bin 1361 -> 0 bytes
.../ignite/portable/test2/1.1/test2-1.1.pom | 9 -
.../portable/test2/maven-metadata-local.xml | 12 -
.../hadoop/SecondaryFileSystemProvider.java | 4 +-
.../hadoop/igfs/HadoopIgfsWrapper.java | 100 +-
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 +
.../testsuites/IgniteHadoopTestSuite.java | 6 +-
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 31 +-
...CacheScanPartitionQueryFallbackSelfTest.java | 105 +-
...QueryOffheapEvictsMultiThreadedSelfTest.java | 5 -
...lientQueryReplicatedNodeRestartSelfTest.java | 8 +-
.../IgniteCacheQueryNodeRestartSelfTest.java | 4 +-
.../IgniteCacheQueryNodeRestartSelfTest2.java | 10 +-
.../IgniteCacheReplicatedQuerySelfTest.java | 49 +-
.../IgnitePortableCacheQueryTestSuite.java | 117 -
.../platform/PlatformContextImpl.java | 4 +-
.../platform/compute/PlatformCompute.java | 2 +-
.../cpp/PlatformCppConfigurationClosure.java | 2 +-
.../PlatformDotNetConfigurationClosure.java | 6 +-
.../platform/events/PlatformEvents.java | 2 +-
.../services/PlatformAbstractService.java | 3 +-
.../Cache/CacheAbstractTest.cs | 71 +-
.../Config/Compute/compute-grid1.xml | 8 +-
.../PlatformComputePortableArgTask.java | 8 +-
.../ignite/schema/generator/CodeGenerator.java | 4 +-
.../ignite/schema/model/PojoDescriptor.java | 6 +-
.../parser/dialect/OracleMetadataDialect.java | 7 +-
.../org/apache/ignite/IgniteSpringBean.java | 7 -
.../yardstick/config/benchmark-query.properties | 5 +-
modules/yardstick/config/ignite-base-config.xml | 2 +-
modules/yardstick/config/ignite-jdbc-config.xml | 55 +
parent/pom.xml | 10 -
pom.xml | 10 +
343 files changed, 15499 insertions(+), 18992 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b80b1719/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 6786b7e,da15de3..1056990
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@@ -267,10 -233,10 +268,11 @@@ public class SocketStreamerSelfTest ext
* @param converter Converter.
* @param r Runnable..
*/
- private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable byte[] delim, Runnable r,
- boolean oneMessagePerTuple) throws Exception
- {
- private void test(@Nullable SocketMessageConverter<Tuple> converter,
- @Nullable byte[] delim,
- Runnable r) throws Exception {
- SocketStreamer<Tuple, Integer, String> sockStmr = null;
++ private void test(@Nullable SocketMessageConverter<Message> converter,
++ @Nullable byte[] delim,
++ Runnable r,
++ boolean oneMessagePerTuple) throws Exception {
+ SocketStreamer<Message, Integer, String> sockStmr = null;
Ignite ignite = grid(0);
[02/22] ignite git commit: IGNITE-1370 Deprecate StreamTupleExtractor
in favor of new Stream*Single*TupleExtractor.
Posted by vo...@apache.org.
IGNITE-1370 Deprecate StreamTupleExtractor in favor of new Stream*Single*TupleExtractor.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1dee96e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1dee96e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1dee96e
Branch: refs/heads/ignite-1282
Commit: b1dee96e0d2b6130959400615b29b02721075392
Parents: 4d9734a
Author: Raul Kripalani <ra...@apache.org>
Authored: Tue Sep 15 00:48:46 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Tue Sep 15 00:48:46 2015 +0100
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 5 +-
.../org/apache/ignite/stream/StreamAdapter.java | 64 ++++++++++++++++----
.../stream/StreamSingleTupleExtractor.java | 40 ++++++++++++
.../ignite/stream/StreamTupleExtractor.java | 23 +++----
.../ignite/stream/socket/SocketStreamer.java | 2 +-
.../stream/socket/SocketStreamerSelfTest.java | 6 +-
6 files changed, 107 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 8e961b9..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -20,6 +20,7 @@ package org.apache.ignite.examples.streaming.wordcount.socket;
import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.util.Map;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
@@ -31,7 +32,7 @@ import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.examples.streaming.wordcount.CacheConfig;
import org.apache.ignite.examples.streaming.wordcount.QueryWords;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.stream.StreamTupleExtractor;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.socket.SocketMessageConverter;
import org.apache.ignite.stream.socket.SocketStreamer;
@@ -108,7 +109,7 @@ public class WordsSocketStreamerServer {
}
});
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<String, AffinityUuid, String>() {
@Override public Map.Entry<AffinityUuid, String> extract(String word) {
// By using AffinityUuid we ensure that identical
// words are processed on the same cluster node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index ffa0821..e7d224c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -37,8 +37,9 @@ import org.apache.ignite.IgniteDataStreamer;
* </ol>
*/
public abstract class StreamAdapter<T, K, V> {
- /** Tuple extractor. */
- private StreamTupleExtractor<T, K, V> extractor;
+
+ /** Tuple extractor extracting a single tuple from an event */
+ private StreamSingleTupleExtractor<T, K, V> singleTupleExtractor;
/** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality). */
private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
@@ -60,11 +61,22 @@ public abstract class StreamAdapter<T, K, V> {
* Stream adapter.
*
* @param stmr Streamer.
- * @param extractor Tuple extractor.
+ * @param extractor Tuple extractor (1:1).
*/
- protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamSingleTupleExtractor<T, K, V> extractor) {
this.stmr = stmr;
- this.extractor = extractor;
+ this.singleTupleExtractor = extractor;
+ }
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor (1:n).
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamMultipleTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.multipleTupleExtractor = extractor;
}
/**
@@ -83,16 +95,44 @@ public abstract class StreamAdapter<T, K, V> {
/**
* @return Provided tuple extractor.
+ * @see #getSingleTupleExtractor()
*/
+ @Deprecated
public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- return extractor;
+ if (singleTupleExtractor instanceof StreamTupleExtractor) {
+ return (StreamTupleExtractor) singleTupleExtractor;
+ }
+ throw new IllegalArgumentException("This method is deprecated and only relevant if using an old " +
+ "StreamTupleExtractor; use getSingleTupleExtractor instead");
}
/**
- * @param extractor Extractor for key-value tuples from messages.
+ * @param extractor Extractor for a single key-value tuple from the message.
+ * @see #setSingleTupleExtractor(StreamSingleTupleExtractor)
*/
+ @Deprecated
public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- this.extractor = extractor;
+ if (multipleTupleExtractor != null) {
+ throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+ }
+ this.singleTupleExtractor = extractor;
+ }
+
+ /**
+ * @return Provided single tuple extractor.
+ */
+ public StreamSingleTupleExtractor<T, K, V> getSingleTupleExtractor() {
+ return singleTupleExtractor;
+ }
+
+ /**
+ * @param singleTupleExtractor Extractor for key-value tuples from messages.
+ */
+ public void setSingleTupleExtractor(StreamSingleTupleExtractor<T, K, V> singleTupleExtractor) {
+ if (multipleTupleExtractor != null) {
+ throw new IllegalArgumentException("Multiple tuple extractor already set; cannot set both types at once.");
+ }
+ this.singleTupleExtractor = singleTupleExtractor;
}
/**
@@ -106,6 +146,9 @@ public abstract class StreamAdapter<T, K, V> {
* @param multipleTupleExtractor Extractor for 1:n tuple extraction.
*/
public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor) {
+ if (singleTupleExtractor != null) {
+ throw new IllegalArgumentException("Single tuple extractor already set; cannot set both types at once.");
+ }
this.multipleTupleExtractor = multipleTupleExtractor;
}
@@ -126,15 +169,12 @@ public abstract class StreamAdapter<T, K, V> {
/**
* Converts given message to 1 or many tuples (depending on the type of extractor) and adds it/them to the
* underlying streamer.
- * <p>
- * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set, the former will take precedence
- * and the latter will be ignored.
*
* @param msg Message to convert.
*/
protected void addMessage(T msg) {
if (multipleTupleExtractor == null) {
- Map.Entry<K, V> e = extractor.extract(msg);
+ Map.Entry<K, V> e = singleTupleExtractor.extract(msg);
if (e != null)
stmr.addData(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
new file mode 100644
index 0000000..fd50e93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamSingleTupleExtractor.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a message to a single Ignite key-value tuple.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be used in cases where a single message/event may
+ * produce more than one tuple.
+ * <p>
+ * NOTE: This interface supersedes the former {@link StreamTupleExtractor} which is now deprecated.
+ *
+ * @see StreamMultipleTupleExtractor
+ */
+public interface StreamSingleTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index aed7d8a..5cd8d55 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -17,22 +17,13 @@
package org.apache.ignite.stream;
-import java.util.Map;
-
/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- * <p>
- * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single message/event may
- * produce more than one tuple.
+ * This interface is provided for backwards compatibility with {@link StreamSingleTupleExtractor}.
*
- * @see StreamMultipleTupleExtractor
+ * @deprecated Will be removed in 2.0.0.
+ * @see StreamSingleTupleExtractor
*/
-public interface StreamTupleExtractor<T, K, V> {
- /**
- * Extracts a key-value tuple from a message.
- *
- * @param msg Message.
- * @return Key-value tuple.
- */
- public Map.Entry<K, V> extract(T msg);
-}
\ No newline at end of file
+@Deprecated
+public interface StreamTupleExtractor<T, K, V> extends StreamSingleTupleExtractor<T, K, V> {
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index c89952d..066a5fd 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
* @throws IgniteException If failed.
*/
public void start() {
- A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+ A.ensure(getSingleTupleExtractor() != null || getMultipleTupleExtractor() != null,
"tupleExtractor (single or multiple)");
A.notNull(getStreamer(), "streamer");
A.notNull(getIgnite(), "ignite");
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1dee96e/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 8b05754..6786b7e 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -44,9 +45,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamSingleTupleExtractor;
import org.apache.ignite.stream.StreamMultipleTupleExtractor;
-import org.apache.ignite.stream.StreamTupleExtractor;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
@@ -292,7 +294,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
sockStmr.setDelimiter(delim);
if (oneMessagePerTuple) {
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer, String>() {
+ sockStmr.setSingleTupleExtractor(new StreamSingleTupleExtractor<Message, Integer, String>() {
@Override public Map.Entry<Integer, String> extract(Message msg) {
return new IgniteBiTuple<>(msg.key, msg.val);
}
[12/22] ignite git commit: Cleaned documentation. Set ATOMIC mode as
default using specific constant.
Posted by vo...@apache.org.
Cleaned documentation. Set ATOMIC mode as default using specific constant.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/50f75bd6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/50f75bd6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/50f75bd6
Branch: refs/heads/ignite-1282
Commit: 50f75bd6111b5b9163391e4c0913ff5b696a2862
Parents: e51fb42
Author: Denis Magda <dm...@gridgain.com>
Authored: Tue Sep 22 11:44:22 2015 +0300
Committer: Denis Magda <dm...@gridgain.com>
Committed: Tue Sep 22 11:44:22 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +++++------------
.../processors/cache/GridCacheProcessor.java | 2 +-
2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
index 9e0f81e..92b5aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheAtomicityMode.java
@@ -33,11 +33,6 @@ public enum CacheAtomicityMode {
/**
* Specified fully {@code ACID}-compliant transactional cache behavior. See
* {@link Transaction} for more information about transactions.
- * <p>
- * This mode is currently the default cache atomicity mode. However, cache
- * atomicity mode will be changed to {@link #ATOMIC} starting from version {@code 5.2},
- * so it is recommended that desired atomicity mode is explicitly configured
- * instead of relying on default value.
*/
TRANSACTIONAL,
@@ -49,18 +44,16 @@ public enum CacheAtomicityMode {
* In addition to transactions and locking, one of the main differences in {@code ATOMIC} mode
* is that bulk writes, such as {@code putAll(...)}, {@code removeAll(...)}, and {@code transformAll(...)}
* methods, become simple batch operations which can partially fail. In case of partial
- * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown which will contain a list of keys
- * for which the update failed. It is recommended that bulk writes are used whenever multiple keys
- * need to be inserted or updated in cache, as they reduce number of network trips and provide
- * better performance.
+ * failure {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException} will be thrown
+ * which will contain a list of keys for which the update failed. It is recommended that bulk writes are used
+ * whenever multiple keys need to be inserted or updated in cache, as they reduce number of network trips and
+ * provide better performance.
* <p>
* Note that even without locking and transactions, {@code ATOMIC} mode still provides
* full consistency guarantees across all cache nodes.
* <p>
* Also note that all data modifications in {@code ATOMIC} mode are guaranteed to be atomic
* and consistent with writes to the underlying persistent store, if one is configured.
- * <p>
- * This mode is currently implemented for {@link CacheMode#PARTITIONED} caches only.
*/
ATOMIC;
@@ -76,4 +69,4 @@ public enum CacheAtomicityMode {
@Nullable public static CacheAtomicityMode fromOrdinal(int ord) {
return ord >= 0 && ord < VALS.length ? VALS[ord] : null;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/50f75bd6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 7c16136..9c325aa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -271,7 +271,7 @@ public class GridCacheProcessor extends GridProcessorAdapter {
cfg.setRebalanceMode(ASYNC);
if (cfg.getAtomicityMode() == null)
- cfg.setAtomicityMode(ATOMIC);
+ cfg.setAtomicityMode(CacheConfiguration.DFLT_CACHE_ATOMICITY_MODE);
if (cfg.getWriteSynchronizationMode() == null)
cfg.setWriteSynchronizationMode(PRIMARY_SYNC);
[17/22] ignite git commit: ignite-1516 Optimize
GridH2AbstractKeyValueRow.getValue
Posted by vo...@apache.org.
ignite-1516 Optimize GridH2AbstractKeyValueRow.getValue
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/72c3eef2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/72c3eef2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/72c3eef2
Branch: refs/heads/ignite-1282
Commit: 72c3eef2aa31df4a68b46a8877809cc0f49c1368
Parents: 39dace4
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 13:51:09 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 13:51:09 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 8 +--
.../processors/cache/GridCacheMapEntry.java | 14 ++---
.../processors/cache/GridCacheProcessor.java | 6 +--
.../cache/GridCacheSwapEntryImpl.java | 31 +++++++++--
.../processors/cache/GridCacheSwapManager.java | 56 +++++++++++++-------
.../processors/query/h2/IgniteH2Indexing.java | 19 ++++---
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 49 ++++++++++-------
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +++-
.../query/h2/opt/GridH2RowDescriptor.java | 5 ++
.../cache/CacheIndexStreamerTest.java | 33 +++++++++---
.../processors/cache/GridCacheSwapSelfTest.java | 4 +-
.../IgniteCacheWithIndexingTestSuite.java | 2 +
12 files changed, 158 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..ae987b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -805,9 +805,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (modes.offheap || modes.swap) {
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
- GridCacheSwapEntry swapEntry = swapMgr.read(cacheKey, modes.offheap, modes.swap);
-
- cacheVal = swapEntry != null ? swapEntry.value() : null;
+ cacheVal = swapMgr.readValue(cacheKey, modes.offheap, modes.swap);
}
}
else
@@ -856,9 +854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (offheap || swap) {
GridCacheSwapManager swapMgr = ctx.isNear() ? ctx.near().dht().context().swap() : ctx.swap();
- GridCacheSwapEntry swapEntry = swapMgr.read(key, offheap, swap);
-
- return swapEntry != null ? swapEntry.value() : null;
+ return swapMgr.readValue(key, offheap, swap);
}
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 961c792..4bf0aa1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -512,7 +512,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
}
else
- e = detached() ? cctx.swap().read(this, true, true, true) : cctx.swap().readAndRemove(this);
+ e = detached() ? cctx.swap().read(this, true, true, true, false) : cctx.swap().readAndRemove(this);
if (log.isDebugEnabled())
log.debug("Read swap entry [swapEntry=" + e + ", cacheEntry=" + this + ']');
@@ -2840,7 +2840,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
}
if (offheap || swap) {
- GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap);
+ GridCacheSwapEntry e = cctx.swap().read(this, false, offheap, swap, true);
return e != null ? e.value() : null;
}
@@ -3581,14 +3581,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme
CacheObject val = rawGetOrUnmarshalUnlocked(false);
- if (val == null) {
- GridCacheSwapEntry swapEntry = cctx.swap().read(key, true, true);
-
- if (swapEntry == null)
- return null;
-
- return swapEntry.value();
- }
+ if (val == null)
+ val = cctx.swap().readValue(key, true, true);
return val;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 9c325aa..e92ea57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2763,14 +2763,10 @@ public class GridCacheProcessor extends GridProcessorAdapter {
try {
KeyCacheObject key = cctx.toCacheKeyObject(keyBytes);
- GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes);
+ GridCacheSwapEntry swapEntry = GridCacheSwapEntryImpl.unmarshal(valBytes, true);
CacheObject val = swapEntry.value();
- if (val == null)
- val = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), swapEntry.type(),
- swapEntry.valueBytes());
-
assert val != null;
qryMgr.remove(key, val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
index b7c66d3..6b1266f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapEntryImpl.java
@@ -94,8 +94,6 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
long expireTime,
@Nullable IgniteUuid keyClsLdrId,
@Nullable IgniteUuid valClsLdrId) {
- assert ver != null;
-
this.valBytes = valBytes;
this.type = type;
this.ver = ver;
@@ -268,9 +266,36 @@ public class GridCacheSwapEntryImpl implements GridCacheSwapEntry {
/**
* @param arr Entry bytes.
+ * @param valOnly If {@code true} unmarshalls only entry value.
* @return Entry.
*/
- public static GridCacheSwapEntryImpl unmarshal(byte[] arr) {
+ public static GridCacheSwapEntryImpl unmarshal(byte[] arr, boolean valOnly) {
+ if (valOnly) {
+ long off = BYTE_ARR_OFF + VERSION_OFFSET; // Skip ttl, expire time.
+
+ boolean verEx = UNSAFE.getByte(arr, off++) != 0;
+
+ off += verEx ? VERSION_EX_SIZE : VERSION_SIZE;
+
+ int arrLen = UNSAFE.getInt(arr, off);
+
+ off += 4;
+
+ byte type = UNSAFE.getByte(arr, off++);
+
+ byte[] valBytes = new byte[arrLen];
+
+ UNSAFE.copyMemory(arr, off, valBytes, BYTE_ARR_OFF, arrLen);
+
+ return new GridCacheSwapEntryImpl(ByteBuffer.wrap(valBytes),
+ type,
+ null,
+ 0L,
+ 0L,
+ null,
+ null);
+ }
+
long off = BYTE_ARR_OFF;
long ttl = UNSAFE.getLong(arr, off);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
index d9a8b5c..2ab7b5b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSwapManager.java
@@ -569,6 +569,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param entryLocked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
* @param readSwap Read swap flag.
+ * @param valOnly If {@code true} unmarshals only entry value.
* @return Value from swap or {@code null}.
* @throws IgniteCheckedException If failed.
*/
@@ -578,7 +579,8 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part,
boolean entryLocked,
boolean readOffheap,
- boolean readSwap)
+ boolean readSwap,
+ boolean valOnly)
throws IgniteCheckedException
{
assert readOffheap || readSwap;
@@ -605,7 +607,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRead(bytes != null);
if (bytes != null)
- return swapEntry(unmarshalSwapEntry(bytes));
+ return swapEntry(unmarshalSwapEntry(bytes, valOnly));
}
if (!swapEnabled || !readSwap)
@@ -620,7 +622,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (bytes == null && lsnr != null)
return lsnr.entry;
- return bytes != null ? swapEntry(unmarshalSwapEntry(bytes)) : null;
+ return bytes != null ? swapEntry(unmarshalSwapEntry(bytes, valOnly)) : null;
}
finally {
if (lsnr != null)
@@ -706,7 +708,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (rmv != null) {
try {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
if (entry == null)
return;
@@ -756,20 +758,22 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @param locked {@code True} if cache entry is locked.
* @param readOffheap Read offheap flag.
* @param readSwap Read swap flag.
+ * @param valOnly If {@code true} unmarshals only entry value.
* @return Read value.
* @throws IgniteCheckedException If read failed.
*/
@Nullable GridCacheSwapEntry read(GridCacheEntryEx entry,
boolean locked,
boolean readOffheap,
- boolean readSwap)
+ boolean readSwap,
+ boolean valOnly)
throws IgniteCheckedException
{
if (!offheapEnabled && !swapEnabled)
return null;
return read(entry.key(), entry.key().valueBytes(cctx.cacheObjectContext()), entry.partition(), locked,
- readOffheap, readSwap);
+ readOffheap, readSwap, valOnly);
}
/**
@@ -805,7 +809,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
* @return Read value.
* @throws IgniteCheckedException If read failed.
*/
- @Nullable public GridCacheSwapEntry read(KeyCacheObject key,
+ @Nullable public CacheObject readValue(KeyCacheObject key,
boolean readOffheap,
boolean readSwap)
throws IgniteCheckedException
@@ -815,7 +819,17 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
int part = cctx.affinity().partition(key);
- return read(key, key.valueBytes(cctx.cacheObjectContext()), part, false, readOffheap, readSwap);
+ GridCacheSwapEntry swapEntry = read(key,
+ key.valueBytes(cctx.cacheObjectContext()),
+ part,
+ false,
+ readOffheap,
+ readSwap,
+ true);
+
+ assert swapEntry == null || swapEntry.value() != null : swapEntry;
+
+ return swapEntry != null ? swapEntry.value() : null;
}
/**
@@ -865,7 +879,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRemove();
}
- entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes));
+ entry = entryBytes == null ? null : swapEntry(unmarshalSwapEntry(entryBytes, false));
}
return entry;
@@ -972,7 +986,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (rmv != null) {
try {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(rmv, false));
if (entry == null)
return;
@@ -1078,7 +1092,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
Collection<GridCacheSwapListener> lsnrs = offheapLsnrs.get(part);
if (lsnrs != null) {
- GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry));
+ GridCacheSwapEntry e = swapEntry(GridCacheSwapEntryImpl.unmarshal(entry, false));
for (GridCacheSwapListener lsnr : lsnrs)
lsnr.onEntryUnswapped(part, key, e);
@@ -1132,7 +1146,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
cctx.cache().metrics0().onOffHeapRead(entryBytes != null);
if (entryBytes != null) {
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, false));
if (entry != null) {
cctx.queries().onUnswap(key, entry.value());
@@ -1165,7 +1179,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
if (entryBytes == null)
return false;
- GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes));
+ GridCacheSwapEntry entry = swapEntry(unmarshalSwapEntry(entryBytes, true));
if (entry == null)
return false;
@@ -2063,7 +2077,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
try {
for (Map.Entry<byte[], byte[]> e : iter) {
try {
- GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue());
+ GridCacheSwapEntry swapEntry = unmarshalSwapEntry(e.getValue(), false);
IgniteUuid valLdrId = swapEntry.valueClassLoaderId();
@@ -2120,10 +2134,11 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/**
* @param bytes Bytes to unmarshal.
+ * @param valOnly If {@code true} unmarshalls only value.
* @return Unmarshalled entry.
*/
- private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes) {
- return GridCacheSwapEntryImpl.unmarshal(bytes);
+ private GridCacheSwapEntry unmarshalSwapEntry(byte[] bytes, boolean valOnly) {
+ return GridCacheSwapEntryImpl.unmarshal(bytes, valOnly);
}
/**
@@ -2169,7 +2184,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
@Override protected Map.Entry<byte[], GridCacheSwapEntry> onNext() throws IgniteCheckedException {
Map.Entry<byte[], byte[]> e = iter.nextX();
- GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue());
+ GridCacheSwapEntry unmarshalled = unmarshalSwapEntry(e.getValue(), false);
return F.t(e.getKey(), swapEntry(unmarshalled));
}
@@ -2446,6 +2461,9 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
abstract protected GridCloseableIterator<T1> partitionIterator(int part) throws IgniteCheckedException;
}
+ /**
+ *
+ */
private class GridVersionedMapEntry<K,V> implements Map.Entry<K,V>, GridCacheVersionAware {
/** */
private Map.Entry<byte[], byte[]> entry;
@@ -2474,7 +2492,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public V getValue() {
try {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+ GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
swapEntry(e);
@@ -2487,7 +2505,7 @@ public class GridCacheSwapManager extends GridCacheManagerAdapter {
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
- GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue());
+ GridCacheSwapEntry e = unmarshalSwapEntry(entry.getValue(), false);
return e.version();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 2af1386..8595187 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -71,7 +71,6 @@ import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheSwapEntry;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
@@ -2108,6 +2107,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
/** */
private final GridUnsafeGuard guard;
+ /** */
+ private final boolean preferSwapVal;
+
/**
* @param type Type descriptor.
* @param schema Schema.
@@ -2136,6 +2138,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
keyType = DataType.getTypeFromClass(type.keyClass());
valType = DataType.getTypeFromClass(type.valueClass());
+
+ preferSwapVal = schema.ccfg.getMemoryMode() == CacheMemoryMode.OFFHEAP_TIERED;
}
/** {@inheritDoc} */
@@ -2263,15 +2267,11 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (cctx.isNear())
cctx = cctx.near().dht().context();
- GridCacheSwapEntry e = cctx.swap().read(cctx.toCacheKeyObject(key), true, true);
+ CacheObject v = cctx.swap().readValue(cctx.toCacheKeyObject(key), true, true);
- if (e == null)
+ if (v == null)
return null;
- CacheObject v = e.value();
-
- assert v != null : "swap must unmarshall it for us";
-
return v.value(cctx.cacheObjectContext(), false);
}
@@ -2312,5 +2312,10 @@ public class IgniteH2Indexing implements GridQueryIndexing {
return new GridH2KeyValueRowOffheap(this, ptr);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean preferSwapValue() {
+ return preferSwapVal;
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index 4a16284..c11f541 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -130,20 +130,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
/**
* Atomically updates weak value.
*
- * @param upd New value.
- * @return {@code null} If update succeeded, unexpected value otherwise.
+ * @param valObj New value.
+ * @return New value if old value is empty, old value otherwise.
+ * @throws IgniteCheckedException If failed.
*/
- protected synchronized Value updateWeakValue(Value upd) {
+ protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
Value res = peekValue(VAL_COL);
if (res != null && !(res instanceof WeakValue))
return res;
+ Value upd = desc.wrap(valObj, desc.valueType());
+
setValue(VAL_COL, new WeakValue(upd));
notifyAll();
- return null;
+ return upd;
}
/**
@@ -188,21 +191,23 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
Value v;
if (col == VAL_COL) {
- v = syncValue(0);
+ v = peekValue(VAL_COL);
long start = 0;
int attempt = 0;
while ((v = WeakValue.unwrap(v)) == null) {
- v = getOffheapValue(VAL_COL);
+ if (!desc.preferSwapValue()) {
+ v = getOffheapValue(VAL_COL);
- if (v != null) {
- setValue(VAL_COL, v);
+ if (v != null) {
+ setValue(VAL_COL, v);
- if (peekValue(KEY_COL) == null)
- cache();
+ if (peekValue(KEY_COL) == null)
+ cache();
- return v;
+ return v;
+ }
}
Object k = getValue(KEY_COL).getObject();
@@ -213,16 +218,24 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
if (valObj != null) {
// Even if we've found valObj in swap, it is may be some new value,
// while the needed value was already unswapped, so we have to recheck it.
- if ((v = WeakValue.unwrap(syncValue(0))) == null && (v = getOffheapValue(VAL_COL)) == null) {
- Value upd = desc.wrap(valObj, desc.valueType());
-
- v = updateWeakValue(upd);
-
- return v == null ? upd : v;
- }
+ if ((v = getOffheapValue(VAL_COL)) == null)
+ return updateWeakValue(valObj);
}
else {
// If nothing found in swap then we should be already unswapped.
+ if (desc.preferSwapValue()) {
+ v = getOffheapValue(VAL_COL);
+
+ if (v != null) {
+ setValue(VAL_COL, v);
+
+ if (peekValue(KEY_COL) == null)
+ cache();
+
+ return v;
+ }
+ }
+
v = syncValue(attempt);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
index de31fe1..2dd9f25 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2KeyValueRowOffheap.java
@@ -216,12 +216,19 @@ public class GridH2KeyValueRowOffheap extends GridH2AbstractKeyValueRow {
/** {@inheritDoc} */
@SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override protected synchronized Value updateWeakValue(Value upd) {
+ @Override protected synchronized Value updateWeakValue(Object valObj) throws IgniteCheckedException {
+ Value val = peekValue(VAL_COL);
+
+ if (val != null)
+ return val;
+
+ Value upd = desc.wrap(valObj, desc.valueType());
+
setValue(VAL_COL, upd);
notifyAll();
- return null;
+ return upd;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
index 0edd102..ed3ff7a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2RowDescriptor.java
@@ -111,4 +111,9 @@ public interface GridH2RowDescriptor extends GridOffHeapSmartPointerFactory<Grid
* @throws IgniteCheckedException If failed.
*/
public Value wrap(Object o, int type) throws IgniteCheckedException;
+
+ /**
+ * @return {@code True} if should check swap value before offheap.
+ */
+ public boolean preferSwapValue();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
index 23f4e91..e6bf22b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheIndexStreamerTest.java
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
-import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -36,6 +36,8 @@ import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
@@ -45,7 +47,6 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
/** */
private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -60,14 +61,29 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testStreamer() throws Exception {
+ public void testStreamerAtomic() throws Exception {
+ checkStreamer(ATOMIC);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testStreamerTx() throws Exception {
+ checkStreamer(TRANSACTIONAL);
+ }
+
+ /**
+ * @param atomicityMode Cache atomicity mode.
+ * @throws Exception If failed.
+ */
+ public void checkStreamer(CacheAtomicityMode atomicityMode) throws Exception {
final Ignite ignite = startGrid(0);
- final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration());
+ final IgniteCache<Integer, String> cache = ignite.createCache(cacheConfiguration(atomicityMode));
final AtomicBoolean stop = new AtomicBoolean();
- final int KEYS= 10_000;
+ final int KEYS = 10_000;
try {
IgniteInternalFuture streamerFut = GridTestUtils.runAsync(new Callable() {
@@ -118,14 +134,15 @@ public class CacheIndexStreamerTest extends GridCommonAbstractTest {
}
/**
+ * @param atomicityMode Cache atomicity mode.
* @return Cache configuration.
*/
- private CacheConfiguration cacheConfiguration() {
+ private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode) {
CacheConfiguration ccfg = new CacheConfiguration();
- ccfg.setAtomicityMode(ATOMIC);
+ ccfg.setAtomicityMode(atomicityMode);
ccfg.setWriteSynchronizationMode(FULL_SYNC);
- ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+ ccfg.setMemoryMode(OFFHEAP_TIERED);
ccfg.setOffHeapMaxMemory(0);
ccfg.setBackups(1);
ccfg.setIndexedTypes(Integer.class, String.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
index e0e6ff0..cd1fc93 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheSwapSelfTest.java
@@ -244,12 +244,12 @@ public class GridCacheSwapSelfTest extends GridCommonAbstractTest {
}
/**
- * TODO: IGNITE-599.
- *
* @throws Exception If failed.
*/
public void testSwapEviction() throws Exception {
try {
+ fail("https://issues.apache.org/jira/browse/IGNITE-599");
+
final CountDownLatch evicted = new CountDownLatch(10);
startGrids(1);
http://git-wip-us.apache.org/repos/asf/ignite/blob/72c3eef2/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index f30f70e..550c69f 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
+import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -63,6 +64,7 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
suite.addTestSuite(GridCacheOffheapIndexGetSelfTest.class);
suite.addTestSuite(GridCacheOffheapIndexEntryEvictTest.class);
+ suite.addTestSuite(CacheIndexStreamerTest.class);
suite.addTestSuite(CacheConfigurationP2PTest.class);
[21/22] ignite git commit: Merge branch 'master' into ignite-1282
Posted by vo...@apache.org.
Merge branch 'master' into ignite-1282
Conflicts:
modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/281b9f2d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/281b9f2d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/281b9f2d
Branch: refs/heads/ignite-1282
Commit: 281b9f2d652ce9109523734ae3843f93feec285f
Parents: b711a5a 0a41ae5
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 22 17:58:20 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 22 17:58:20 2015 +0300
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 5 +-
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +-
.../configuration/CacheConfiguration.java | 21 +
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheMapEntry.java | 51 +-
.../processors/cache/GridCacheProcessor.java | 10 +-
.../cache/GridCacheSwapEntryImpl.java | 31 +-
.../processors/cache/GridCacheSwapManager.java | 80 ++-
.../datastreamer/DataStreamerImpl.java | 2 -
.../org/apache/ignite/stream/StreamAdapter.java | 104 +++-
.../stream/StreamMultipleTupleExtractor.java | 38 ++
.../stream/StreamSingleTupleExtractor.java | 40 ++
.../ignite/stream/StreamTupleExtractor.java | 20 +-
.../ignite/stream/socket/SocketStreamer.java | 3 +-
.../IgniteCacheEntryListenerAbstractTest.java | 65 +-
.../IgniteCachePutRetryAbstractSelfTest.java | 33 +
...lientDiscoverySpiFailureTimeoutSelfTest.java | 118 +++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 13 +-
.../stream/socket/SocketStreamerSelfTest.java | 112 +++-
.../processors/query/h2/IgniteH2Indexing.java | 19 +-
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 54 +-
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 11 +-
.../query/h2/opt/GridH2RowDescriptor.java | 5 +
.../processors/query/h2/opt/GridH2Table.java | 10 +-
.../cache/CacheIndexStreamerTest.java | 37 +-
.../processors/cache/GridCacheSwapSelfTest.java | 4 +-
.../IgniteCacheWithIndexingTestSuite.java | 2 +
modules/mqtt/pom.xml | 114 ++++
.../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
.../stream/mqtt/IgniteMqttStreamerTest.java | 553 +++++++++++++++++
.../mqtt/IgniteMqttStreamerTestSuite.java | 34 ++
pom.xml | 1 +
32 files changed, 2037 insertions(+), 189 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/281b9f2d/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index 9fb56bc,44a3fa9..44f7e1b
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@@ -17,10 -17,25 +17,17 @@@
package org.apache.ignite.configuration;
+ import java.io.Serializable;
+ import java.util.Collection;
-import java.util.HashSet;
+ import javax.cache.Cache;
+ import javax.cache.configuration.CompleteConfiguration;
+ import javax.cache.configuration.Factory;
+ import javax.cache.configuration.MutableConfiguration;
+ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
-import org.apache.ignite.cache.CacheAtomicityMode;
-import org.apache.ignite.cache.CacheEntryProcessor;
-import org.apache.ignite.cache.CacheInterceptor;
-import org.apache.ignite.cache.CacheMemoryMode;
-import org.apache.ignite.cache.CacheMode;
-import org.apache.ignite.cache.CacheRebalanceMode;
-import org.apache.ignite.cache.CacheTypeMetadata;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.*;
import org.apache.ignite.cache.affinity.AffinityFunction;
import org.apache.ignite.cache.affinity.AffinityKeyMapper;
import org.apache.ignite.cache.eviction.EvictionFilter;
http://git-wip-us.apache.org/repos/asf/ignite/blob/281b9f2d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/281b9f2d/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 268221e,c19a9b7..df383ef
--- a/pom.xml
+++ b/pom.xml
@@@ -75,7 -75,9 +75,8 @@@
<module>modules/kafka</module>
<module>modules/yarn</module>
<module>modules/jms11</module>
+ <module>modules/mqtt</module>
<module>modules/zookeeper</module>
- <module>modules/platform</module>
</modules>
<profiles>
[06/22] ignite git commit: IGNITE-1370 Refactor StreamTupleExtractor
API for 0..1 and 0..n extraction.
Posted by vo...@apache.org.
IGNITE-1370 Refactor StreamTupleExtractor API for 0..1 and 0..n extraction.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/421a5234
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/421a5234
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/421a5234
Branch: refs/heads/ignite-1282
Commit: 421a5234b5a7e56e36952a4c1976b3118310073e
Parents: eae4df1 b80b171
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 16:28:12 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 16:28:12 2015 +0100
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 5 +-
.../org/apache/ignite/stream/StreamAdapter.java | 104 +++++++++++++++--
.../stream/StreamMultipleTupleExtractor.java | 38 +++++++
.../stream/StreamSingleTupleExtractor.java | 40 +++++++
.../ignite/stream/StreamTupleExtractor.java | 20 ++--
.../ignite/stream/socket/SocketStreamer.java | 3 +-
.../stream/socket/SocketStreamerSelfTest.java | 112 ++++++++++++++-----
7 files changed, 270 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
[09/22] ignite git commit: IGNITE-535 Merge MQTT Streamer into master.
Posted by vo...@apache.org.
IGNITE-535 Merge MQTT Streamer into master.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88acd318
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88acd318
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88acd318
Branch: refs/heads/ignite-1282
Commit: 88acd318b84ce3bff8c061bb34718e0e5f7127fb
Parents: 421a523 296dd6e
Author: Raul Kripalani <ra...@apache.org>
Authored: Mon Sep 21 17:26:04 2015 +0100
Committer: Raul Kripalani <ra...@apache.org>
Committed: Mon Sep 21 17:26:04 2015 +0100
----------------------------------------------------------------------
modules/mqtt/pom.xml | 114 ++++
.../apache/ignite/stream/mqtt/MqttStreamer.java | 611 +++++++++++++++++++
.../stream/mqtt/IgniteMqttStreamerTest.java | 553 +++++++++++++++++
.../mqtt/IgniteMqttStreamerTestSuite.java | 34 ++
pom.xml | 1 +
5 files changed, 1313 insertions(+)
----------------------------------------------------------------------
[18/22] ignite git commit: Check for WeakValue in
GridH2AbstractKeyValueRow.onUnswap
Posted by vo...@apache.org.
Check for WeakValue in GridH2AbstractKeyValueRow.onUnswap
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca2bce00
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca2bce00
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca2bce00
Branch: refs/heads/ignite-1282
Commit: ca2bce00516142a1204fb9226c938174047e72d6
Parents: 72c3eef
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 15:04:27 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 15:04:27 2015 +0300
----------------------------------------------------------------------
.../processors/query/h2/opt/GridH2AbstractKeyValueRow.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ca2bce00/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
index c11f541..ca5442a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2AbstractKeyValueRow.java
@@ -119,7 +119,9 @@ public abstract class GridH2AbstractKeyValueRow extends GridH2Row {
* @throws IgniteCheckedException If failed.
*/
public synchronized void onUnswap(Object val, boolean beforeRmv) throws IgniteCheckedException {
- if (peekValue(VAL_COL) != null)
+ Value val0 = peekValue(VAL_COL);
+
+ if (val0 != null && !(val0 instanceof WeakValue))
return;
setValue(VAL_COL, desc.wrap(val, desc.valueType()));
[14/22] ignite git commit: Merge remote-tracking branch
'origin/ignite-1.4' into ignite-1.4
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-1.4' into ignite-1.4
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a1040872
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a1040872
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a1040872
Branch: refs/heads/ignite-1282
Commit: a1040872f37cf4fd1dc20584c68307f420d0d3af
Parents: 33fe30d 50f75bd
Author: sboikov <sb...@gridgain.com>
Authored: Tue Sep 22 12:59:14 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Tue Sep 22 12:59:14 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/cache/CacheAtomicityMode.java | 17 +++++------------
.../processors/cache/GridCacheProcessor.java | 2 +-
2 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------