You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 08:33:00 UTC
[11/24] incubator-ignite git commit: ignite-545: merge from
ignite-sprint-6
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 3f81dc4..a03d2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
import java.nio.*;
@@ -33,9 +32,6 @@ import java.nio.*;
* | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE |
* +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
* </pre>
- * <p>
- * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream,
- * isn't equal to these bytes than exception will be thrown.
*/
public class GridBufferedParser implements GridNioParser {
/** Buffer metadata key. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
new file mode 100644
index 0000000..256597c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is appended with
+ * delimiter (bytes array). So, the stream structure is as follows:
+ * <pre>
+ * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * | MESSAGE | DELIMITER | MESSAGE | DELIMITER |
+ * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * </pre>
+ */
+public class GridDelimitedParser implements GridNioParser {
+ /** Buffer metadata key. */
+ private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Delimiter. */
+ private final byte[] delim;
+
+ /** Direct buffer. */
+ private final boolean directBuf;
+
+ /**
+ * @param delim Delimiter.
+ * @param directBuf Direct buffer.
+ */
+ public GridDelimitedParser(byte[] delim, boolean directBuf) {
+ this.delim = delim;
+ this.directBuf = directBuf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+ GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+ // Decode for a given session is called per one thread, so there should not be any concurrency issues.
+ // However, we make some additional checks.
+ if (nioBuf == null) {
+ nioBuf = new GridNioDelimitedBuffer(delim);
+
+ GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+ assert old == null;
+ }
+
+ return nioBuf.read(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ byte[] msg0 = (byte[])msg;
+
+ int cap = msg0.length + delim.length;
+ ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
+
+ res.put(msg0);
+ res.put(delim);
+
+ res.flip();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
new file mode 100644
index 0000000..2b764ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Buffer with message delimiter support.
+ */
+public class GridNioDelimitedBuffer {
+ /** Delimiter. */
+ private final byte[] delim;
+
+ /** Data. */
+ private byte[] data = new byte[16384];
+
+ /** Count. */
+ private int cnt;
+
+ /** Index. */
+ private int idx;
+
+ /**
+ * @param delim Delimiter.
+ */
+ public GridNioDelimitedBuffer(byte[] delim) {
+ assert delim != null;
+ assert delim.length > 0;
+
+ this.delim = delim;
+
+ reset();
+ }
+
+ /**
+ * Resets buffer state.
+ */
+ private void reset() {
+ cnt = 0;
+ idx = 0;
+ }
+
+ /**
+ * @param buf Buffer.
+ * @return Message bytes or {@code null} if message is not fully read yet.
+ */
+ @Nullable public byte[] read(ByteBuffer buf) {
+ while(buf.hasRemaining()) {
+ if (cnt == data.length)
+ data = Arrays.copyOf(data, data.length * 2);
+
+ byte b = buf.get();
+
+ data[cnt++] = b;
+
+ if (b == delim[idx])
+ idx++;
+ else if (idx > 0) {
+ int pos = cnt - idx;
+
+ idx = 0;
+
+ for (int i = pos; i < cnt; i++) {
+ if (data[pos] == delim[idx]) {
+ pos++;
+
+ idx++;
+ }
+ else {
+ pos = cnt - idx;
+
+ idx = 0;
+ }
+ }
+ }
+
+ if (idx == delim.length) {
+ byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
+
+ reset();
+
+ return bytes;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
index 49850ab..a945262 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/GridOffHeapPartitionedMap.java
@@ -201,6 +201,15 @@ public interface GridOffHeapPartitionedMap {
public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c);
/**
+ * Gets iterator over the partition.
+ *
+ * @param c Key/value closure.
+ * @param part Partition.
+ * @return Iterator over the partition.
+ */
+ public <T> GridCloseableIterator<T> iterator(CX2<T2<Long, Integer>, T2<Long, Integer>, T> c, int part);
+
+ /**
* Gets iterator over certain partition.
*
* @param p Partition.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
index ba67b30..4ffc33f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/offheap/unsafe/GridUnsafePartitionedMap.java
@@ -277,21 +277,8 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
/** {@inheritDoc} */
@Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator() {
- return new GridCloseableIteratorAdapter<IgniteBiTuple<byte[], byte[]>>() {
- private int p;
-
- private GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> curIt;
-
- {
- try {
- advance();
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace(); // Should never happen.
- }
- }
-
- private void advance() throws IgniteCheckedException {
+ return new PartitionedMapCloseableIterator<IgniteBiTuple<byte[], byte[]>>() {
+ protected void advance() throws IgniteCheckedException {
curIt = null;
while (p < parts) {
@@ -305,34 +292,6 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
curIt = null;
}
-
- @Override protected IgniteBiTuple<byte[], byte[]> onNext() throws IgniteCheckedException {
- if (curIt == null)
- throw new NoSuchElementException();
-
- IgniteBiTuple<byte[], byte[]> t = curIt.next();
-
- if (!curIt.hasNext()) {
- curIt.close();
-
- advance();
- }
-
- return t;
- }
-
- @Override protected boolean onHasNext() {
- return curIt != null;
- }
-
- @Override protected void onRemove() {
- throw new UnsupportedOperationException();
- }
-
- @Override protected void onClose() throws IgniteCheckedException {
- if (curIt != null)
- curIt.close();
- }
};
}
@@ -340,21 +299,8 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
@Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c) {
assert c != null;
- return new GridCloseableIteratorAdapter<T>() {
- private int p;
-
- private GridCloseableIterator<T> curIt;
-
- {
- try {
- advance();
- }
- catch (IgniteCheckedException e) {
- e.printStackTrace(); // Should never happen.
- }
- }
-
- private void advance() throws IgniteCheckedException {
+ return new PartitionedMapCloseableIterator<T>() {
+ protected void advance() throws IgniteCheckedException {
curIt = null;
while (p < parts) {
@@ -368,38 +314,16 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
curIt = null;
}
-
- @Override protected T onNext() throws IgniteCheckedException {
- if (curIt == null)
- throw new NoSuchElementException();
-
- T t = curIt.next();
-
- if (!curIt.hasNext()) {
- curIt.close();
-
- advance();
- }
-
- return t;
- }
-
- @Override protected boolean onHasNext() {
- return curIt != null;
- }
-
- @Override protected void onRemove() {
- throw new UnsupportedOperationException();
- }
-
- @Override protected void onClose() throws IgniteCheckedException {
- if (curIt != null)
- curIt.close();
- }
};
}
/** {@inheritDoc} */
+ @Override public <T> GridCloseableIterator<T> iterator(final CX2<T2<Long, Integer>, T2<Long, Integer>, T> c,
+ int part) {
+ return mapFor(part).iterator(c);
+ }
+
+ /** {@inheritDoc} */
@Override public GridCloseableIterator<IgniteBiTuple<byte[], byte[]>> iterator(int p) {
return mapFor(p).iterator();
}
@@ -430,4 +354,63 @@ public class GridUnsafePartitionedMap implements GridOffHeapPartitionedMap {
public long lruSize() {
return lru.size();
}
+
+ /**
+ * Partitioned closable iterator.
+ */
+ private abstract class PartitionedMapCloseableIterator<T> extends GridCloseableIteratorAdapter<T> {
+ /** Current partition. */
+ protected int p;
+
+ /** Current iterator. */
+ protected GridCloseableIterator<T> curIt;
+
+ {
+ try {
+ advance();
+ }
+ catch (IgniteCheckedException e) {
+ e.printStackTrace(); // Should never happen.
+ }
+ }
+
+ /**
+ * Switch to next partition.
+ *
+ * @throws IgniteCheckedException If failed.
+ */
+ abstract void advance() throws IgniteCheckedException;
+
+ /** {@inheritDoc} */
+ @Override protected T onNext() throws IgniteCheckedException {
+ if (curIt == null)
+ throw new NoSuchElementException();
+
+ T t = curIt.next();
+
+ if (!curIt.hasNext()) {
+ curIt.close();
+
+ advance();
+ }
+
+ return t;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onHasNext() {
+ return curIt != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onRemove() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IgniteCheckedException {
+ if (curIt != null)
+ curIt.close();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java b/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
index 82ef421..a9e9e93 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/spring/IgniteSpringHelper.java
@@ -69,7 +69,7 @@ public interface IgniteSpringHelper {
* Loads all configurations with given type specified within given configuration file.
*
* @param cfgUrl Configuration file path or URL. This cannot be {@code null}.
- * @param cl Required type of configuration.
+ * @param cls Required type of configuration.
* @param excludedProps Properties to exclude.
* @return Tuple containing all loaded configurations and Spring context used to load them.
* @throws IgniteCheckedException If configuration could not be read.
@@ -81,7 +81,7 @@ public interface IgniteSpringHelper {
* Loads all configurations with given type specified within given configuration input stream.
*
* @param cfgStream Configuration input stream. This cannot be {@code null}.
- * @param cl Required type of configuration.
+ * @param cls Required type of configuration.
* @param excludedProps Properties to exclude.
* @return Tuple containing all loaded configurations and Spring context used to load them.
* @throws IgniteCheckedException If configuration could not be read.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
index c05b9e0..30be424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheMetrics.java
@@ -29,7 +29,7 @@ import java.io.*;
*/
public class VisorCacheMetrics implements Serializable {
/** */
- private static final int MICROSECONDS_IN_SECOND = 1_000_000;
+ private static final float MICROSECONDS_IN_SECOND = 1_000_000;
/** */
private static final long serialVersionUID = 0L;
@@ -94,14 +94,11 @@ public class VisorCacheMetrics implements Serializable {
/** Reads per second. */
private int readsPerSec;
- /** Writes per second. */
- private int writesPerSec;
+ /** Puts per second. */
+ private int putsPerSec;
- /** Hits per second. */
- private int hitsPerSec;
-
- /** Misses per second. */
- private int missesPerSec;
+ /** Removes per second. */
+ private int removalsPerSec;
/** Commits per second. */
private int commitsPerSec;
@@ -160,15 +157,11 @@ public class VisorCacheMetrics implements Serializable {
/**
* Calculate rate of metric per second.
*
- * @param metric Metric value.
- * @param time Metric finish time.
- * @param createTime Metric start time.
+ * @param meanTime Metric mean time.
* @return Metric per second.
*/
- private static int perSecond(int metric, long time, long createTime) {
- long seconds = (time - createTime) / 1000;
-
- return (seconds > 0) ? (int)(metric / seconds) : 0;
+ private static int perSecond(float meanTime) {
+ return (meanTime > 0) ? (int)(MICROSECONDS_IN_SECOND / meanTime) : 0;
}
/**
@@ -209,12 +202,11 @@ public class VisorCacheMetrics implements Serializable {
cm.avgPutTime = m.getAveragePutTime();
cm.avgRemovalTime = m.getAverageRemoveTime();
- cm.readsPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageGetTime());
- cm.writesPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAveragePutTime());
- cm.hitsPerSec = -1;
- cm.missesPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageRemoveTime());
- cm.commitsPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageTxCommitTime());
- cm.rollbacksPerSec = (int)(MICROSECONDS_IN_SECOND * 1.f / m.getAverageTxRollbackTime());
+ cm.readsPerSec = perSecond(m.getAverageGetTime());
+ cm.putsPerSec = perSecond(m.getAveragePutTime());
+ cm.removalsPerSec = perSecond(m.getAverageRemoveTime());
+ cm.commitsPerSec = perSecond(m.getAverageTxCommitTime());
+ cm.rollbacksPerSec = perSecond(m.getAverageTxRollbackTime());
cm.qryMetrics = VisorCacheQueryMetrics.from(c.context().queries().metrics());
@@ -364,24 +356,17 @@ public class VisorCacheMetrics implements Serializable {
}
/**
- * @return Writes per second.
- */
- public int writesPerSecond() {
- return writesPerSec;
- }
-
- /**
- * @return Hits per second.
+ * @return Puts per second.
*/
- public int hitsPerSecond() {
- return hitsPerSec;
+ public int putsPerSecond() {
+ return putsPerSec;
}
/**
- * @return Misses per second.
+ * @return Removes per second.
*/
- public int missesPerSecond() {
- return missesPerSec;
+ public int removalsPerSecond() {
+ return removalsPerSec;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
index a968f4f..3706650 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheNearConfiguration.java
@@ -33,7 +33,7 @@ public class VisorCacheNearConfiguration implements Serializable {
/** */
private static final long serialVersionUID = 0L;
- /** Flag to enable/disable near cache eviction policy. */
+ /** Flag indicating if near cache enabled. */
private boolean nearEnabled;
/** Near cache start size. */
@@ -66,7 +66,7 @@ public class VisorCacheNearConfiguration implements Serializable {
}
/**
- * @return Flag to enable/disable near cache eviction policy.
+ * @return {@code true} if near cache enabled.
*/
public boolean nearEnabled() {
return nearEnabled;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java
new file mode 100644
index 0000000..2aa03a2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/cache/VisorCacheStartTask.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.visor.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.compute.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.processors.task.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.internal.visor.*;
+import org.apache.ignite.internal.visor.util.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Task that start cache or near cache with specified configuration.
+ */
+@GridInternal
+public class VisorCacheStartTask extends
+ VisorMultiNodeTask<VisorCacheStartTask.VisorCacheStartArg, Map<UUID, IgniteException>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override protected VisorCacheStartJob job(VisorCacheStartArg arg) {
+ return new VisorCacheStartJob(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override protected Map<UUID, IgniteException> reduce0(List<ComputeJobResult> results) throws IgniteException {
+ Map<UUID, IgniteException> map = new HashMap<>();
+
+ for (ComputeJobResult res : results)
+ if (res.getException() != null)
+ map.put(res.getNode().id(), res.getException());
+
+ return map;
+ }
+
+ /**
+ * Cache start arguments.
+ */
+ @SuppressWarnings("PublicInnerClass")
+ public static class VisorCacheStartArg implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final boolean near;
+
+ /** */
+ private final String name;
+
+ /** */
+ private final String cfg;
+
+ /**
+ * @param near {@code true} if near cache should be started.
+ * @param name Name for near cache.
+ * @param cfg Cache XML configuration.
+ */
+ public VisorCacheStartArg(boolean near, String name, String cfg) {
+ this.near = near;
+ this.name = name;
+ this.cfg = cfg;
+ }
+
+ /**
+ * @return {@code true} if near cache should be started.
+ */
+ public boolean near() {
+ return near;
+ }
+
+ /**
+ * @return Name for near cache.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * @return Cache XML configuration.
+ */
+ public String configuration() {
+ return cfg;
+ }
+ }
+
+ /**
+ * Job that start cache or near cache with specified configuration.
+ */
+ private static class VisorCacheStartJob extends VisorJob<VisorCacheStartArg, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create job.
+ *
+ * @param arg Contains cache name and XML configurations of cache.
+ * @param debug Debug flag.
+ */
+ private VisorCacheStartJob(VisorCacheStartArg arg, boolean debug) {
+ super(arg, debug);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Void run(VisorCacheStartArg arg) throws IgniteException {
+ String cfg = arg.configuration();
+
+ assert !F.isEmpty(cfg);
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(cfg.getBytes())) {
+ if (arg.near) {
+ NearCacheConfiguration nearCfg = Ignition.loadSpringBean(bais, "nearCacheConfiguration");
+
+ ignite.createNearCache(VisorTaskUtils.unescapeName(arg.name()), nearCfg);
+ }
+ else {
+ CacheConfiguration cacheCfg = Ignition.loadSpringBean(bais, "cacheConfiguration");
+
+ ignite.createCache(cacheCfg);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(VisorCacheStartJob.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
index becebda..5050414 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryArg.java
@@ -32,17 +32,22 @@ public class VisorQueryArg implements Serializable {
/** Query text. */
private final String qryTxt;
+ /** Flag whether to execute query locally. */
+ private final boolean local;
+
/** Result batch size. */
private final int pageSize;
/**
* @param cacheName Cache name for query.
* @param qryTxt Query text.
+ * @param local Flag whether to execute query locally.
* @param pageSize Result batch size.
*/
- public VisorQueryArg(String cacheName, String qryTxt, int pageSize) {
+ public VisorQueryArg(String cacheName, String qryTxt, boolean local, int pageSize) {
this.cacheName = cacheName;
this.qryTxt = qryTxt;
+ this.local = local;
this.pageSize = pageSize;
}
@@ -61,6 +66,13 @@ public class VisorQueryArg implements Serializable {
}
/**
+ * @return {@code true} if query should be executed locally.
+ */
+ public boolean local() {
+ return local;
+ }
+
+ /**
* @return Page size.
*/
public int pageSize() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
index ebf62fa..4a9daad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/query/VisorQueryJob.java
@@ -75,6 +75,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
if (scan) {
ScanQuery<Object, Object> qry = new ScanQuery<>(null);
qry.setPageSize(arg.pageSize());
+ qry.setLocal(arg.local());
long start = U.currentTimeMillis();
@@ -100,6 +101,7 @@ public class VisorQueryJob extends VisorJob<VisorQueryArg, IgniteBiTuple<? exten
else {
SqlFieldsQuery qry = new SqlFieldsQuery(arg.queryTxt());
qry.setPageSize(arg.pageSize());
+ qry.setLocal(arg.local());
long start = U.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
index 7cfc18f..e8ae76d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/util/VisorTaskUtils.java
@@ -120,6 +120,16 @@ public class VisorTaskUtils {
}
/**
+ * @param name Escaped name.
+ * @return Name or {@code null} for default name.
+ */
+ public static String unescapeName(String name) {
+ assert name != null;
+
+ return DFLT_EMPTY_NAME.equals(name) ? null : name;
+ }
+
+ /**
* @param a First name.
* @param b Second name.
* @return {@code true} if both names equals.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
index 76ebcee..be05a38 100644
--- a/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteAsyncSupport.java
@@ -22,9 +22,9 @@ package org.apache.ignite.lang;
*/
public interface IgniteAsyncSupport {
/**
- * Gets component with asynchronous mode enabled.
+ * Gets instance of this component with asynchronous mode enabled.
*
- * @return Component with asynchronous mode enabled.
+ * @return Instance of this component with asynchronous mode enabled.
*/
public IgniteAsyncSupport withAsync();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/services/Service.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/services/Service.java b/modules/core/src/main/java/org/apache/ignite/services/Service.java
index 2bd5649..4f927a3 100644
--- a/modules/core/src/main/java/org/apache/ignite/services/Service.java
+++ b/modules/core/src/main/java/org/apache/ignite/services/Service.java
@@ -55,10 +55,7 @@ import java.io.*;
* ...
* GridServices svcs = grid.services();
*
- * GridFuture<?> fut = svcs.deployClusterSingleton("mySingleton", new MyGridService());
- *
- * // Wait for deployment to complete.
- * fut.get();
+ * svcs.deployClusterSingleton("mySingleton", new MyGridService());
* </pre>
* Or from grid configuration on startup:
* <pre name="code" class="java">
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index b43f8a5..871512c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -67,7 +67,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
private String name;
/** Grid SPI context. */
- private volatile IgniteSpiContext spiCtx = new GridDummySpiContext(null, false);
+ private volatile IgniteSpiContext spiCtx = new GridDummySpiContext(null, false, null);
/** Discovery listener. */
private GridLocalEventListener paramsLsnr;
@@ -190,7 +190,7 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
ClusterNode locNode = spiCtx == null ? null : spiCtx.localNode();
// Set dummy no-op context.
- spiCtx = new GridDummySpiContext(locNode, true);
+ spiCtx = new GridDummySpiContext(locNode, true, spiCtx);
}
/**
@@ -551,15 +551,24 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** */
private final boolean stopping;
+ /** */
+ private final MessageFactory msgFactory;
+
+ /** */
+ private final MessageFormatter msgFormatter;
+
/**
* Create temp SPI context.
*
* @param locNode Local node.
* @param stopping Node stopping flag.
+ * @param spiCtx SPI context.
*/
- GridDummySpiContext(ClusterNode locNode, boolean stopping) {
+ GridDummySpiContext(ClusterNode locNode, boolean stopping, @Nullable IgniteSpiContext spiCtx) {
this.locNode = locNode;
this.stopping = stopping;
+ this.msgFactory = spiCtx != null ? spiCtx.messageFactory() : null;
+ this.msgFormatter = spiCtx != null ? spiCtx.messageFormatter() : null;
}
/** {@inheritDoc} */
@@ -711,12 +720,12 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** {@inheritDoc} */
@Override public MessageFormatter messageFormatter() {
- return null;
+ return msgFormatter;
}
/** {@inheritDoc} */
@Override public MessageFactory messageFactory() {
- return null;
+ return msgFactory;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2d5c541..fd17791 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -203,7 +203,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final int DFLT_ACK_SND_THRESHOLD = 16;
/** Default socket write timeout. */
- public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
+ public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
index e3182c4..4196306 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpi.java
@@ -226,10 +226,6 @@ public class TcpClientDiscoverySpi extends TcpDiscoverySpiAdapter implements Tcp
if (netTimeout < 3000)
U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
- // Warn on odd heartbeat frequency.
- if (hbFreq < 2000)
- U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
-
registerMBean(gridName, this, TcpClientDiscoverySpiMBean.class);
try {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 206a8c3..ed0e9dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -288,6 +288,10 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private ConcurrentLinkedDeque<String> debugLog;
+ /** */
+ private final CopyOnWriteArrayList<IgniteInClosure<TcpDiscoveryAbstractMessage>> sendMsgLsnrs =
+ new CopyOnWriteArrayList<>();
+
/** {@inheritDoc} */
@IgniteInstanceResource
@Override public void injectResources(Ignite ignite) {
@@ -853,10 +857,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
if (netTimeout < 3000)
U.warn(log, "Network timeout is too low (at least 3000 ms recommended): " + netTimeout);
- // Warn on odd heartbeat frequency.
- if (hbFreq < 2000)
- U.warn(log, "Heartbeat frequency is too high (at least 2000 ms recommended): " + hbFreq);
-
registerMBean(gridName, this, TcpDiscoverySpiMBean.class);
if (ipFinder instanceof TcpDiscoveryMulticastIpFinder) {
@@ -2068,13 +2068,16 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
/**
* <strong>FOR TEST ONLY!!!</strong>
- * <p>
- * This method is intended for test purposes only.
- *
- * @param msg Message.
*/
- void onBeforeMessageSentAcrossRing(Serializable msg) {
- // No-op.
+ public void addSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
+ sendMsgLsnrs.add(msg);
+ }
+
+ /**
+ * <strong>FOR TEST ONLY!!!</strong>
+ */
+ public void removeSendMessageListener(IgniteInClosure<TcpDiscoveryAbstractMessage> msg) {
+ sendMsgLsnrs.remove(msg);
}
/**
@@ -2683,11 +2686,30 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
assert ring.hasRemoteNodes();
- onBeforeMessageSentAcrossRing(msg);
+ for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : sendMsgLsnrs)
+ msgLsnr.apply(msg);
if (redirectToClients(msg)) {
- for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values())
- clientMsgWorker.addMessage(msg);
+ byte[] marshalledMsg = null;
+
+ for (ClientMessageWorker clientMsgWorker : clientMsgWorkers.values()) {
+ // Send a clone to client to avoid ConcurrentModificationException
+ TcpDiscoveryAbstractMessage msgClone;
+
+ try {
+ if (marshalledMsg == null)
+ marshalledMsg = marsh.marshal(msg);
+
+ msgClone = marsh.unmarshal(marshalledMsg, null);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal message: " + msg, e);
+
+ msgClone = msg;
+ }
+
+ clientMsgWorker.addMessage(msgClone);
+ }
}
Collection<TcpDiscoveryNode> failedNodes;
@@ -3930,18 +3952,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
long topVer;
if (locNodeCoord) {
- if (!msg.client() && ipFinder.isShared()) {
- try {
- ipFinder.unregisterAddresses(leftNode.socketAddresses());
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to unregister left node address: " + leftNode);
-
- onException("Failed to unregister left node address: " + leftNode, e);
- }
- }
-
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
@@ -4109,20 +4119,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
long topVer;
if (locNodeCoord) {
- if (!node.isClient() && ipFinder.isShared()) {
- try {
- ipFinder.unregisterAddresses(node.socketAddresses());
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to unregister failed node address [node=" + node +
- ", err=" + e.getMessage() + ']');
-
- onException("Failed to unregister failed node address [node=" + node +
- ", err=" + e.getMessage() + ']', e);
- }
- }
-
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 26f6869..802da02 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -51,20 +51,20 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
/** Default port to listen (value is <tt>47500</tt>). */
public static final int DFLT_PORT = 47500;
- /** Default socket operations timeout in milliseconds (value is <tt>2,000ms</tt>). */
- public static final long DFLT_SOCK_TIMEOUT = 2000;
+ /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
+ public static final long DFLT_SOCK_TIMEOUT = 200;
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 5000;
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>50ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT = 50;
- /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
+ /** Default network timeout in milliseconds (value is <tt>5000ms</tt>). */
public static final long DFLT_NETWORK_TIMEOUT = 5000;
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
- /** Default heartbeat messages issuing frequency (value is <tt>2,000ms</tt>). */
- public static final long DFLT_HEARTBEAT_FREQ = 2000;
+ /** Default heartbeat messages issuing frequency (value is <tt>100ms</tt>). */
+ public static final long DFLT_HEARTBEAT_FREQ = 100;
/** Default size of topology snapshots history. */
public static final int DFLT_TOP_HISTORY_SIZE = 1000;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
new file mode 100644
index 0000000..0c4e2d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+ /** Tuple extractor. */
+ private StreamTupleExtractor<T, K, V> extractor;
+
+ /** Streamer. */
+ private IgniteDataStreamer<K, V> stmr;
+
+ /** Ignite. */
+ private Ignite ignite;
+
+ /**
+ * Empty constructor.
+ */
+ protected StreamAdapter() {
+ // No-op.
+ }
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor.
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided data streamer.
+ */
+ public IgniteDataStreamer<K, V> getStreamer() {
+ return stmr;
+ }
+
+ /**
+ * @param stmr Ignite data streamer.
+ */
+ public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+ this.stmr = stmr;
+ }
+
+ /**
+ * @return Provided tuple extractor.
+ */
+ public StreamTupleExtractor<T, K, V> getTupleExtractor() {
+ return extractor;
+ }
+
+ /**
+ * @param extractor Extractor for key-value tuples from messages.
+ */
+ public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided {@link Ignite} instance.
+ */
+ public Ignite getIgnite() {
+ return ignite;
+ }
+
+ /**
+ * @param ignite {@link Ignite} instance.
+ */
+ public void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+
+ /**
+ * Converts given message to a tuple and adds it to the underlying streamer.
+ *
+ * @param msg Message to convert.
+ */
+ protected void addMessage(T msg) {
+ Map.Entry<K, V> e = extractor.extract(msg);
+
+ if (e != null)
+ stmr.addData(e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
new file mode 100644
index 0000000..d2a4ede
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
new file mode 100644
index 0000000..8161d86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+/**
+ * Socket message converter.
+ */
+public interface SocketMessageConverter<T> {
+ /**
+ * Converter message represented by array of bytes to object.
+ *
+ * @param msg Message.
+ * @return Converted object.
+ */
+ public T convert(byte[] msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
new file mode 100644
index 0000000..07ce77e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.*;
+
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Default threads. */
+ private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Address. */
+ private InetAddress addr;
+
+ /** Server port. */
+ private int port;
+
+ /** Threads number. */
+ private int threads = DFLT_THREADS;
+
+ /** Direct mode. */
+ private boolean directMode;
+
+ /** Delimiter. */
+ private byte[] delim;
+
+ /** Converter. */
+ private SocketMessageConverter<T> converter;
+
+ /** Server. */
+ private GridNioServer<byte[]> srv;
+
+ /**
+ * Sets server address.
+ *
+ * @param addr Address.
+ */
+ public void setAddr(InetAddress addr) {
+ this.addr = addr;
+ }
+
+ /**
+ * Sets port number.
+ *
+ * @param port Port.
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Sets threadds amount.
+ *
+ * @param threads Threads.
+ */
+ public void setThreads(int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets direct mode flag.
+ *
+ * @param directMode Direct mode.
+ */
+ public void setDirectMode(boolean directMode) {
+ this.directMode = directMode;
+ }
+
+ /**
+ * Sets message delimiter.
+ *
+ * @param delim Delimiter.
+ */
+ public void setDelimiter(byte[] delim) {
+ this.delim = delim;
+ }
+
+ /**
+ * Sets message converter.
+ *
+ * @param converter Converter.
+ */
+ public void setConverter(SocketMessageConverter<T> converter) {
+ this.converter = converter;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getTupleExtractor(), "tupleExtractor");
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(threads > 0, "threads > 0");
+
+ log = getIgnite().log();
+
+ GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+ @Override public void onConnected(GridNioSession ses) {
+ assert ses.accepted();
+
+ if (log.isDebugEnabled())
+ log.debug("Accepted connection: " + ses.remoteAddress());
+ }
+
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ if (e != null)
+ log.error("Connection failed with exception", e);
+ }
+
+ @Override public void onMessage(GridNioSession ses, byte[] msg) {
+ addMessage(converter.convert(msg));
+ }
+ };
+
+ ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+ GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+ new GridDelimitedParser(delim, directMode);
+
+ if (converter == null)
+ converter = new DefaultConverter<>();
+
+ GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+ GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+ try {
+ srv = new GridNioServer.Builder<byte[]>()
+ .address(addr == null ? InetAddress.getLocalHost() : addr)
+ .port(port)
+ .listener(lsnr)
+ .logger(log)
+ .selectorCount(threads)
+ .byteOrder(byteOrder)
+ .filters(filters)
+ .build();
+ }
+ catch (IgniteCheckedException | UnknownHostException e) {
+ throw new IgniteException(e);
+ }
+
+ srv.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server started on " + addr + ':' + port);
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ srv.stop();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server stopped");
+ }
+
+ /**
+ * Converts message to Java object using Jdk marshaller.
+ */
+ private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+ /** Marshaller. */
+ private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+ /** {@inheritDoc} */
+ @Override public T convert(byte[] msg) {
+ try {
+ return MARSH.unmarshal(msg, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..e1cef65
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains socket streamer implementation.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index a79d5b8..b3eed46 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -115,6 +115,7 @@ org.apache.ignite.configuration.CollectionConfiguration
org.apache.ignite.configuration.DeploymentMode
org.apache.ignite.configuration.IgniteReflectionFactory
org.apache.ignite.configuration.NearCacheConfiguration
+org.apache.ignite.configuration.TopologyValidator
org.apache.ignite.configuration.TransactionConfiguration
org.apache.ignite.events.CacheEvent
org.apache.ignite.events.CacheQueryExecutedEvent
@@ -248,6 +249,7 @@ org.apache.ignite.internal.managers.communication.GridIoManager$ConcurrentHashMa
org.apache.ignite.internal.managers.communication.GridIoMessage
org.apache.ignite.internal.managers.communication.GridIoPolicy
org.apache.ignite.internal.managers.communication.GridIoUserMessage
+org.apache.ignite.internal.managers.communication.GridLifecycleAwareMessageFilter
org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean
org.apache.ignite.internal.managers.deployment.GridDeploymentPerVersionStore$2
org.apache.ignite.internal.managers.deployment.GridDeploymentRequest
@@ -321,13 +323,13 @@ org.apache.ignite.internal.processors.cache.GridCacheAdapter$72
org.apache.ignite.internal.processors.cache.GridCacheAdapter$73
org.apache.ignite.internal.processors.cache.GridCacheAdapter$74
org.apache.ignite.internal.processors.cache.GridCacheAdapter$9
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearAllJob
org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalClearKeySetJob
org.apache.ignite.internal.processors.cache.GridCacheAdapter$GlobalSizeCallable
org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadCacheClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$LoadKeysCallable
-org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeCallable
+org.apache.ignite.internal.processors.cache.GridCacheAdapter$SizeJob
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdateGetTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutAndGetTimeStatClosure
org.apache.ignite.internal.processors.cache.GridCacheAdapter$UpdatePutTimeStatClosure
@@ -445,6 +447,7 @@ org.apache.ignite.internal.processors.cache.IgniteCacheProxy$2
org.apache.ignite.internal.processors.cache.IgniteCacheProxy$4
org.apache.ignite.internal.processors.cache.IgniteCacheProxy$5
org.apache.ignite.internal.processors.cache.IgniteCacheProxy$6
+org.apache.ignite.internal.processors.cache.IgniteCacheProxy$7
org.apache.ignite.internal.processors.cache.KeyCacheObject
org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityProxy
@@ -452,8 +455,9 @@ org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresMa
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$RemoveSetDataCallable
org.apache.ignite.internal.processors.cache.distributed.GridCacheCommittedTxInfo
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxRequest
-org.apache.ignite.internal.processors.cache.distributed.GridCacheOptimisticCheckPreparedTxResponse
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture$1
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest
+org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest
org.apache.ignite.internal.processors.cache.distributed.GridDistributedBaseMessage
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter
@@ -725,6 +729,7 @@ org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$11
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$12
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$13
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$14
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$15
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$2
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$3
org.apache.ignite.internal.processors.cache.transactions.IgniteTxHandler$4
@@ -762,9 +767,11 @@ org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$Po
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostLockClosure2
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter$PostMissClosure
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$2
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$3
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$AtomicInt
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$CommitListener
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$CommittedVersion
+org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager$NodeFailureTimeoutObject$1
org.apache.ignite.internal.processors.cache.transactions.IgniteTxMap
org.apache.ignite.internal.processors.cache.transactions.IgniteTxMap$1
org.apache.ignite.internal.processors.cache.transactions.IgniteTxMap$1$1
@@ -809,6 +816,7 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$Discove
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryDataItem
org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$StartRequestData
org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3
+org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$4
org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters$Batched
org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters$BatchedSorted
org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters$Individual
@@ -1132,7 +1140,6 @@ org.apache.ignite.internal.util.lang.GridAbsClosure
org.apache.ignite.internal.util.lang.GridAbsClosureX
org.apache.ignite.internal.util.lang.GridCloseableIterator
org.apache.ignite.internal.util.lang.GridClosureException
-org.apache.ignite.internal.util.lang.GridComputeJobWrapper
org.apache.ignite.internal.util.lang.GridFunc$1
org.apache.ignite.internal.util.lang.GridFunc$10
org.apache.ignite.internal.util.lang.GridFunc$100
@@ -1383,6 +1390,8 @@ org.apache.ignite.internal.visor.cache.VisorCacheMetrics
org.apache.ignite.internal.visor.cache.VisorCacheMetricsCollectorTask
org.apache.ignite.internal.visor.cache.VisorCacheMetricsCollectorTask$VisorCacheMetricsCollectorJob
org.apache.ignite.internal.visor.cache.VisorCacheNearConfiguration
+org.apache.ignite.internal.visor.cache.VisorCacheNodesTask
+org.apache.ignite.internal.visor.cache.VisorCacheNodesTask$VisorCacheNodesJob
org.apache.ignite.internal.visor.cache.VisorCacheQueryConfiguration
org.apache.ignite.internal.visor.cache.VisorCacheQueryMetrics
org.apache.ignite.internal.visor.cache.VisorCacheRebalanceConfiguration
@@ -1390,6 +1399,9 @@ org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask
org.apache.ignite.internal.visor.cache.VisorCacheRebalanceTask$VisorCachesRebalanceJob
org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask
org.apache.ignite.internal.visor.cache.VisorCacheResetMetricsTask$VisorCacheResetMetricsJob
+org.apache.ignite.internal.visor.cache.VisorCacheStartTask
+org.apache.ignite.internal.visor.cache.VisorCacheStartTask$VisorCacheStartArg
+org.apache.ignite.internal.visor.cache.VisorCacheStartTask$VisorCacheStartJob
org.apache.ignite.internal.visor.cache.VisorCacheStopTask
org.apache.ignite.internal.visor.cache.VisorCacheStopTask$VisorCacheStopJob
org.apache.ignite.internal.visor.cache.VisorCacheStoreConfiguration
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/main/resources/ignite.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/ignite.properties b/modules/core/src/main/resources/ignite.properties
index 549bde3..78e294f 100644
--- a/modules/core/src/main/resources/ignite.properties
+++ b/modules/core/src/main/resources/ignite.properties
@@ -15,7 +15,7 @@
# limitations under the License.
#
-ignite.version=1.0.0
+ignite.version=1.2.0-SNAPSHOT
ignite.build=0
ignite.revision=DEV
ignite.rel.date=01011970
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
index 2b119ec..abc9109 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridUpdateNotifierSelfTest.java
@@ -18,10 +18,13 @@
package org.apache.ignite.internal;
import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.plugin.*;
import org.apache.ignite.testframework.junits.common.*;
-import org.h2.constant.*;
import org.jetbrains.annotations.*;
+import java.util.*;
import java.util.concurrent.*;
/**
@@ -29,6 +32,8 @@ import java.util.concurrent.*;
*/
@GridCommonTest(group = "Kernal Self")
public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
+ private String updateStatusParams;
+
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 30 * 1000;
@@ -39,6 +44,12 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
super.beforeTestsStarted();
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "true");
+
+ Properties props = U.field(IgniteProperties.class, "PROPS");
+
+ updateStatusParams = props.getProperty("ignite.update.status.params");
+
+ props.setProperty("ignite.update.status.params", "ver=" + IgniteProperties.get("ignite.version"));
}
/** {@inheritDoc} */
@@ -46,14 +57,20 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
super.afterTestsStopped();
System.setProperty(IgniteSystemProperties.IGNITE_UPDATE_NOTIFIER, "false");
+
+ Properties props = U.field(IgniteProperties.class, "PROPS");
+
+ props.setProperty("ignite.update.status.params", updateStatusParams);
}
/**
* @throws Exception If failed.
*/
public void testNotifier() throws Exception {
- GridUpdateNotifier ntf = new GridUpdateNotifier(null, IgniteProperties.get("ignite.version"),
- TEST_GATEWAY, false);
+ String nodeVer = IgniteProperties.get("ignite.version");
+
+ GridUpdateNotifier ntf = new GridUpdateNotifier(null, nodeVer,
+ TEST_GATEWAY, Collections.<PluginProvider>emptyList(), false);
ntf.checkForNewVersion(new SelfExecutor(), log);
@@ -63,6 +80,13 @@ public class GridUpdateNotifierSelfTest extends GridCommonAbstractTest {
assertNotNull("Ignite latest version has not been detected.", ver);
+ byte nodeMaintenance = IgniteProductVersion.fromString(nodeVer).maintenance();
+
+ byte lastMaintenance = IgniteProductVersion.fromString(ver).maintenance();
+
+ assertTrue("Wrong latest version.", (nodeMaintenance == 0 && lastMaintenance == 0) ||
+ (nodeMaintenance > 0 && lastMaintenance > 0));
+
ntf.reportStatus(log);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
new file mode 100644
index 0000000..5859bec
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGetFromJobTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.resources.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.atomic.*;
+
+/**
+ * Job tries to get cache during topology change.
+ */
+public class CacheGetFromJobTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTopologyChange() throws Exception {
+ final AtomicReference<Exception> err = new AtomicReference<>();
+
+ final AtomicInteger id = new AtomicInteger(1);
+
+ IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new CA() {
+ @Override public void apply() {
+ info("Run topology change.");
+
+ try {
+ for (int i = 0; i < 5; i++) {
+ info("Topology change: " + i);
+
+ startGrid(id.getAndIncrement());
+ }
+ }
+ catch (Exception e) {
+ err.set(e);
+
+ log.error("Unexpected exception in topology-change-thread: " + e, e);
+ }
+ }
+ }, 3, "topology-change-thread");
+
+ int cntr = 0;
+
+ while (!fut.isDone()) {
+ grid(0).compute().broadcast(new TestJob());
+
+ cntr++;
+ }
+
+ log.info("Job execution count: " + cntr);
+
+ Exception err0 = err.get();
+
+ if (err0 != null)
+ throw err0;
+ }
+
+ /**
+ * Test job.
+ */
+ private static class TestJob implements IgniteCallable<Object> {
+ /** Ignite. */
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ /** */
+ public TestJob() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ IgniteCache cache = ignite.cache(null);
+
+ assertNotNull(cache);
+
+ assertEquals(0, cache.localSize());
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/96f0956d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
new file mode 100644
index 0000000..8c7d33d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheOffheapMapEntrySelfTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.processors.cache.local.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.cache.CacheMemoryMode.*;
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Cache map entry self test.
+ */
+public class CacheOffheapMapEntrySelfTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ // No-op.
+ }
+
+ /**
+ * @param gridName Grid name.
+ * @param memoryMode Memory mode.
+ * @param atomicityMode Atomicity mode.
+ * @param cacheMode Cache mode.
+ * @param cacheName Cache name.
+ * @return Cache configuration.
+ * @throws Exception If failed.
+ */
+ private CacheConfiguration cacheConfiguration(String gridName,
+ CacheMemoryMode memoryMode,
+ CacheAtomicityMode atomicityMode,
+ CacheMode cacheMode,
+ String cacheName)
+ throws Exception
+ {
+ CacheConfiguration cfg = super.cacheConfiguration(gridName);
+
+ cfg.setCacheMode(cacheMode);
+ cfg.setAtomicityMode(atomicityMode);
+ cfg.setMemoryMode(memoryMode);
+ cfg.setName(cacheName);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testCacheMapEntry() throws Exception {
+ checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, LOCAL, GridLocalCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, LOCAL, GridLocalCacheEntry.class);
+
+ checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, LOCAL, GridLocalCacheEntry.class);
+
+ checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, PARTITIONED, GridNearCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, PARTITIONED, GridNearOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(ONHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_TIERED, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_VALUES, ATOMIC, REPLICATED, GridDhtAtomicOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(ONHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_TIERED, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
+
+ checkCacheMapEntry(OFFHEAP_VALUES, TRANSACTIONAL, REPLICATED, GridDhtColocatedOffHeapCacheEntry.class);
+ }
+
+ /**
+ * @param memoryMode Cache memory mode.
+ * @param atomicityMode Cache atomicity mode.
+ * @param cacheMode Cache mode.
+ * @param entryCls Class of cache map entry.
+ * @throws Exception If failed.
+ */
+ private void checkCacheMapEntry(CacheMemoryMode memoryMode,
+ CacheAtomicityMode atomicityMode,
+ CacheMode cacheMode,
+ Class<?> entryCls)
+ throws Exception
+ {
+ log.info("Test cache [memMode=" + memoryMode +
+ ", atomicityMode=" + atomicityMode +
+ ", cacheMode=" + cacheMode + ']');
+
+ CacheConfiguration cfg = cacheConfiguration(grid(0).name(),
+ memoryMode,
+ atomicityMode,
+ cacheMode,
+ "Cache");
+
+ try (IgniteCache jcache = grid(0).getOrCreateCache(cfg)) {
+ GridCacheAdapter<Integer, String> cache = ((IgniteKernal)grid(0)).internalCache(jcache.getName());
+
+ Integer key = primaryKey(grid(0).cache(null));
+
+ cache.put(key, "val");
+
+ GridCacheEntryEx entry = cache.entryEx(key);
+
+ entry.unswap(true);
+
+ assertNotNull(entry);
+
+ assertEquals(entry.getClass(), entryCls);
+ }
+ }
+}