You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/09 14:30:05 UTC
[13/40] incubator-ignite git commit: # ignite-883
# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fb827a77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fb827a77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fb827a77
Branch: refs/heads/ignite-gg-10299
Commit: fb827a7784614343ae639ea8b856d2f9f88d46db
Parents: db57652
Author: sboikov <sb...@gridgain.com>
Authored: Fri Jun 5 11:41:46 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Jun 5 15:10:00 2015 +0300
----------------------------------------------------------------------
.../datastructures/DataStructuresProcessor.java | 31 +-
.../timeout/GridSpiTimeoutObject.java | 14 +
.../util/nio/GridCommunicationClient.java | 6 -
.../util/nio/GridTcpCommunicationClient.java | 554 -------------------
.../util/nio/GridTcpNioCommunicationClient.java | 8 -
.../org/apache/ignite/spi/IgniteSpiAdapter.java | 4 +
.../communication/tcp/TcpCommunicationSpi.java | 97 +---
.../tcp/TcpCommunicationSpiMBean.java | 2 -
.../internal/util/nio/GridNioSelfTest.java | 2 +-
.../GridTcpCommunicationSpiAbstractTest.java | 4 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 2 +-
.../GridTcpCommunicationSpiConfigSelfTest.java | 2 -
...cpCommunicationSpiMultithreadedSelfTest.java | 2 +-
.../discovery/AbstractDiscoverySelfTest.java | 13 +-
14 files changed, 61 insertions(+), 680 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 2138639..aa3bfe2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -101,7 +101,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
private IgniteInternalCache<CacheDataStructuresCacheKey, List<CacheCollectionInfo>> utilityDataCache;
/** */
- private UUID qryId;
+ private volatile UUID qryId;
/**
* @param ctx Context.
@@ -144,11 +144,22 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
seqView = atomicsCache;
dsCacheCtx = atomicsCache.context();
+ }
+ }
- qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
- new DataStructuresEntryFilter(),
- dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
- false);
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void startQuery() throws IgniteCheckedException {
+ if (qryId == null) {
+ synchronized (this) {
+ if (qryId == null) {
+ qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
+ new DataStructuresEntryFilter(),
+ dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
+ false);
+ }
+ }
}
}
@@ -178,6 +189,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicSequence>() {
@Override public IgniteAtomicSequence applyx() throws IgniteCheckedException {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -304,6 +317,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicLong>() {
@Override public IgniteAtomicLong applyx() throws IgniteCheckedException {
final GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -507,6 +522,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicReference>() {
@Override public IgniteAtomicReference<T> applyx() throws IgniteCheckedException {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
@@ -608,6 +625,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteAtomicStamped>() {
@Override public IgniteAtomicStamped<T, S> applyx() throws IgniteCheckedException {
GridCacheInternalKeyImpl key = new GridCacheInternalKeyImpl(name);
@@ -916,6 +935,8 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
checkAtomicsConfiguration();
+ startQuery();
+
return getAtomic(new IgniteOutClosureX<IgniteCountDownLatch>() {
@Override public IgniteCountDownLatch applyx() throws IgniteCheckedException {
GridCacheInternalKey key = new GridCacheInternalKeyImpl(name);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
index 82267a2..a0fd9b4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridSpiTimeoutObject.java
@@ -53,6 +53,20 @@ public class GridSpiTimeoutObject implements GridTimeoutObject {
}
/** {@inheritDoc} */
+ @Override public int hashCode() {
+ assert false;
+
+ return super.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ assert false;
+
+ return super.equals(obj);
+ }
+
+ /** {@inheritDoc} */
@Override public final String toString() {
return S.toString(GridSpiTimeoutObject.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 2f7fd88..693a5a4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -100,12 +100,6 @@ public interface GridCommunicationClient {
public boolean sendMessage(@Nullable UUID nodeId, Message msg) throws IgniteCheckedException;
/**
- * @param timeout Timeout.
- * @throws IOException If failed.
- */
- public void flushIfNeeded(long timeout) throws IOException;
-
- /**
* @return {@code True} if send is asynchronous.
*/
public boolean async();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
deleted file mode 100644
index 72c20f8..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpCommunicationClient.java
+++ /dev/null
@@ -1,554 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.util.nio;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.plugin.extensions.communication.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.nio.*;
-import java.util.*;
-import java.util.concurrent.locks.*;
-
-/**
- * Grid client for NIO server.
- */
-public class GridTcpCommunicationClient extends GridAbstractCommunicationClient {
- /** Socket. */
- private final Socket sock;
-
- /** Output stream. */
- private final UnsafeBufferedOutputStream out;
-
- /** Minimum buffered message count. */
- private final int minBufferedMsgCnt;
-
- /** Communication buffer size ratio. */
- private final double bufSizeRatio;
-
- /** */
- private final ByteBuffer writeBuf;
-
- /** */
- private final MessageFormatter formatter;
-
- /**
- * @param metricsLsnr Metrics listener.
- * @param addr Address.
- * @param locHost Local address.
- * @param connTimeout Connect timeout.
- * @param tcpNoDelay Value for {@code TCP_NODELAY} socket option.
- * @param sockRcvBuf Socket receive buffer.
- * @param sockSndBuf Socket send buffer.
- * @param bufSize Buffer size (or {@code 0} to disable buffer).
- * @param minBufferedMsgCnt Minimum buffered message count.
- * @param bufSizeRatio Communication buffer size ratio.
- * @param formatter Message formatter.
- * @throws IgniteCheckedException If failed.
- */
- public GridTcpCommunicationClient(
- GridNioMetricsListener metricsLsnr,
- InetSocketAddress addr,
- InetAddress locHost,
- long connTimeout,
- boolean tcpNoDelay,
- int sockRcvBuf,
- int sockSndBuf,
- int bufSize,
- int minBufferedMsgCnt,
- double bufSizeRatio,
- MessageFormatter formatter
- ) throws IgniteCheckedException {
- super(metricsLsnr);
-
- assert metricsLsnr != null;
- assert addr != null;
- assert locHost != null;
- assert connTimeout >= 0;
- assert bufSize >= 0;
-
- A.ensure(minBufferedMsgCnt >= 0,
- "Value of minBufferedMessageCount property cannot be less than zero.");
- A.ensure(bufSizeRatio > 0 && bufSizeRatio < 1,
- "Value of bufSizeRatio property must be between 0 and 1 (exclusive).");
-
- this.minBufferedMsgCnt = minBufferedMsgCnt;
- this.bufSizeRatio = bufSizeRatio;
- this.formatter = formatter;
-
- writeBuf = ByteBuffer.allocate(8 << 10);
-
- writeBuf.order(ByteOrder.nativeOrder());
-
- sock = new Socket();
-
- boolean success = false;
-
- try {
- sock.bind(new InetSocketAddress(locHost, 0));
-
- sock.setTcpNoDelay(tcpNoDelay);
-
- if (sockRcvBuf > 0)
- sock.setReceiveBufferSize(sockRcvBuf);
-
- if (sockSndBuf > 0)
- sock.setSendBufferSize(sockSndBuf);
-
- sock.connect(addr, (int)connTimeout);
-
- out = new UnsafeBufferedOutputStream(sock.getOutputStream(), bufSize);
-
- success = true;
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to connect to remote host " +
- "[addr=" + addr + ", localHost=" + locHost + ']', e);
- }
- finally {
- if (!success)
- U.closeQuiet(sock);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void doHandshake(IgniteInClosure2X<InputStream, OutputStream> handshakeC) throws IgniteCheckedException {
- try {
- handshakeC.applyx(sock.getInputStream(), sock.getOutputStream());
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to access IO streams when executing handshake with remote node: " +
- sock.getRemoteSocketAddress(), e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean close() {
- boolean res = super.close();
-
- if (res) {
- U.closeQuiet(out);
- U.closeQuiet(sock);
- }
-
- return res;
- }
-
- /** {@inheritDoc} */
- @Override public void forceClose() {
- super.forceClose();
-
- try {
- out.flush();
- }
- catch (IOException ignored) {
- // No-op.
- }
-
- // Do not call (directly or indirectly) out.close() here
- // since it may cause a deadlock.
- out.forceClose();
-
- U.closeQuiet(sock);
- }
-
- /** {@inheritDoc} */
- @Override public void sendMessage(byte[] data, int len) throws IgniteCheckedException {
- if (closed())
- throw new IgniteCheckedException("Client was closed: " + this);
-
- try {
- out.write(data, 0, len);
-
- metricsLsnr.onBytesSent(len);
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
- }
-
- markUsed();
- }
-
- /** {@inheritDoc} */
- @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg)
- throws IgniteCheckedException {
- if (closed())
- throw new IgniteCheckedException("Client was closed: " + this);
-
- assert writeBuf.hasArray();
-
- try {
- int cnt = U.writeMessageFully(msg, out, writeBuf, formatter.writer());
-
- metricsLsnr.onBytesSent(cnt);
- }
- catch (IOException e) {
- throw new IgniteCheckedException("Failed to send message to remote node: " + sock.getRemoteSocketAddress(), e);
- }
-
- markUsed();
-
- return false;
- }
-
- /**
- * @param timeout Timeout.
- * @throws IOException If failed.
- */
- @Override public void flushIfNeeded(long timeout) throws IOException {
- assert timeout > 0;
-
- out.flushOnTimeout(timeout);
- }
-
- /** {@inheritDoc} */
- @Override public void sendMessage(ByteBuffer data) throws IgniteCheckedException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridTcpCommunicationClient.class, this, super.toString());
- }
-
- /**
- *
- */
- private class UnsafeBufferedOutputStream extends FilterOutputStream {
- /** The internal buffer where data is stored. */
- private final byte buf[];
-
- /** Current size. */
- private int size;
-
- /** Count. */
- private int cnt;
-
- /** Message count. */
- private int msgCnt;
-
- /** Total messages size. */
- private int totalCnt;
-
- /** Lock. */
- private final ReentrantLock lock = new ReentrantLock();
-
- /** Last flushed timestamp. */
- private volatile long lastFlushed = U.currentTimeMillis();
-
- /** Cached flush timeout. */
- private volatile long flushTimeout;
-
- /** Buffer adjusted timestamp. */
- private long lastAdjusted = U.currentTimeMillis();
-
- /**
- * Creates a new buffered output stream to write data to the
- * specified underlying output stream.
- *
- * @param out The underlying output stream.
- */
- UnsafeBufferedOutputStream(OutputStream out) {
- this(out, 8192);
- }
-
- /**
- * Creates a new buffered output stream to write data to the
- * specified underlying output stream with the specified buffer
- * size.
- *
- * @param out The underlying output stream.
- * @param size The buffer size.
- */
- UnsafeBufferedOutputStream(OutputStream out, int size) {
- super(out);
-
- assert size >= 0;
-
- this.size = size;
- buf = size > 0 ? new byte[size] : null;
- }
-
- /** {@inheritDoc} */
- @Override public void write(int b) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public void write(byte[] b, int off, int len) throws IOException {
- assert b != null;
- assert off == 0;
-
- // No buffering.
- if (buf == null) {
- lock.lock();
-
- try {
- out.write(b, 0, len);
- }
- finally {
- lock.unlock();
- }
-
- return;
- }
-
- // Buffering is enabled.
- lock.lock();
-
- try {
- msgCnt++;
- totalCnt += len;
-
- if (len >= size) {
- flushLocked();
-
- out.write(b, 0, len);
-
- lastFlushed = U.currentTimeMillis();
-
- adjustBufferIfNeeded();
-
- return;
- }
-
- if (cnt + len > size) {
- flushLocked();
-
- messageToBuffer0(b, off, len, buf, 0);
-
- cnt = len;
-
- assert cnt < size;
-
- adjustBufferIfNeeded();
-
- return;
- }
-
- messageToBuffer0(b, 0, len, buf, cnt);
-
- cnt += len;
-
- if (cnt == size)
- flushLocked();
- else
- flushIfNeeded();
- }
- finally {
- lock.unlock();
- }
- }
-
- /**
- * @throws IOException If failed.
- */
- private void flushIfNeeded() throws IOException {
- assert lock.isHeldByCurrentThread();
- assert buf != null;
-
- long flushTimeout0 = flushTimeout;
-
- if (flushTimeout0 > 0)
- flushOnTimeoutLocked(flushTimeout0);
- }
-
- /**
- *
- */
- private void adjustBufferIfNeeded() {
- assert lock.isHeldByCurrentThread();
- assert buf != null;
-
- long flushTimeout0 = flushTimeout;
-
- if (flushTimeout0 > 0)
- adjustBufferLocked(flushTimeout0);
- }
-
- /** {@inheritDoc} */
- @Override public void flush() throws IOException {
- lock.lock();
-
- try {
- flushLocked();
- }
- finally {
- lock.unlock();
- }
- }
-
- /**
- * @param timeout Timeout.
- * @throws IOException If failed.
- */
- public void flushOnTimeout(long timeout) throws IOException {
- assert buf != null;
- assert timeout > 0;
-
- // Overwrite cached value.
- flushTimeout = timeout;
-
- if (lastFlushed + timeout > U.currentTimeMillis() || !lock.tryLock())
- return;
-
- try {
- flushOnTimeoutLocked(timeout);
- }
- finally {
- lock.unlock();
- }
- }
-
- /**
- * @param timeout Timeout.
- * @throws IOException If failed.
- */
- private void flushOnTimeoutLocked(long timeout) throws IOException {
- assert lock.isHeldByCurrentThread();
- assert timeout > 0;
-
- // Double check.
- if (cnt == 0 || lastFlushed + timeout > U.currentTimeMillis())
- return;
-
- flushLocked();
-
- adjustBufferLocked(timeout);
- }
-
- /**
- * @param timeout Timeout.
- */
- private void adjustBufferLocked(long timeout) {
- assert lock.isHeldByCurrentThread();
- assert timeout > 0;
-
- long time = U.currentTimeMillis();
-
- if (lastAdjusted + timeout < time) {
- if (msgCnt <= minBufferedMsgCnt)
- size = 0;
- else {
- size = (int)(totalCnt * bufSizeRatio);
-
- if (size > buf.length)
- size = buf.length;
- }
-
- msgCnt = 0;
- totalCnt = 0;
-
- lastAdjusted = time;
- }
- }
-
- /**
- * @throws IOException If failed.
- */
- private void flushLocked() throws IOException {
- assert lock.isHeldByCurrentThread();
-
- if (buf != null && cnt > 0) {
- out.write(buf, 0, cnt);
-
- cnt = 0;
- }
-
- out.flush();
-
- lastFlushed = U.currentTimeMillis();
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws IOException {
- lock.lock();
-
- try {
- flushLocked();
- }
- finally {
- try {
- out.close();
- }
- finally {
- lock.unlock();
- }
- }
- }
-
- /**
- * Forcibly closes underlying stream ignoring any possible exception.
- */
- public void forceClose() {
- try {
- out.close();
- }
- catch (IOException ignored) {
- // No-op.
- }
- }
-
- /**
- * @param b Buffer to copy from.
- * @param off Offset in source buffer.
- * @param len Length.
- * @param resBuf Result buffer.
- * @param resOff Result offset.
- */
- private void messageToBuffer(byte[] b, int off, int len, byte[] resBuf, int resOff) {
- assert b.length == len;
- assert off == 0;
- assert resBuf.length >= resOff + len + 4;
-
- U.intToBytes(len, resBuf, resOff);
-
- U.arrayCopy(b, off, resBuf, resOff + 4, len);
- }
-
- /**
- * @param b Buffer to copy from (length included).
- * @param off Offset in source buffer.
- * @param len Length.
- * @param resBuf Result buffer.
- * @param resOff Result offset.
- */
- private void messageToBuffer0(byte[] b, int off, int len, byte[] resBuf, int resOff) {
- assert off == 0;
- assert resBuf.length >= resOff + len;
-
- U.arrayCopy(b, off, resBuf, resOff, len);
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- lock.lock();
-
- try {
- return S.toString(UnsafeBufferedOutputStream.class, this);
- }
- finally {
- lock.unlock();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 788a8e6..abad875 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -122,14 +122,6 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
return false;
}
- /**
- * @param timeout Timeout.
- * @throws IOException If failed.
- */
- @Override public void flushIfNeeded(long timeout) throws IOException {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public boolean async() {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index c9c633f..d095491 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -730,11 +730,15 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
/** {@inheritDoc} */
@Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
+ assert ignite instanceof IgniteKernal : ignite;
+
((IgniteKernal)ignite).context().timeout().addTimeoutObject(new GridSpiTimeoutObject(obj));
}
/** {@inheritDoc} */
@Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
+ assert ignite instanceof IgniteKernal : ignite;
+
((IgniteKernal)ignite).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index b324ab2..359de1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -157,12 +157,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Default idle connection timeout (value is <tt>30000</tt>ms). */
public static final long DFLT_IDLE_CONN_TIMEOUT = 30000;
- /** Default value for connection buffer flush frequency (value is <tt>100</tt> ms). */
- public static final long DFLT_CONN_BUF_FLUSH_FREQ = 100;
-
- /** Default value for connection buffer size (value is <tt>0</tt>). */
- public static final int DFLT_CONN_BUF_SIZE = 0;
-
/** Default socket send and receive buffer size. */
public static final int DFLT_SOCK_BUF_SIZE = 32 * 1024;
@@ -603,13 +597,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Idle connection timeout. */
private long idleConnTimeout = DFLT_IDLE_CONN_TIMEOUT;
- /** Connection buffer flush frequency. */
- private volatile long connBufFlushFreq = DFLT_CONN_BUF_FLUSH_FREQ;
-
- /** Connection buffer size. */
- @SuppressWarnings("RedundantFieldInitialization")
- private int connBufSize = DFLT_CONN_BUF_SIZE;
-
/** Connect timeout. */
private long connTimeout = DFLT_CONN_TIMEOUT;
@@ -647,9 +634,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/** Socket write timeout. */
private long sockWriteTimeout = DFLT_SOCK_WRITE_TIMEOUT;
- /** Flush client worker. */
- private ClientFlushWorker clientFlushWorker;
-
/** Recovery and idle clients handler. */
private CommunicationWorker commWorker;
@@ -876,31 +860,29 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
* Sets connection buffer size. If set to {@code 0} connection buffer is disabled.
- * <p>
- * If not provided, default value is {@link #DFLT_CONN_BUF_SIZE}.
*
* @param connBufSize Connection buffer size.
* @see #setConnectionBufferFlushFrequency(long)
*/
@IgniteSpiConfiguration(optional = true)
public void setConnectionBufferSize(int connBufSize) {
- this.connBufSize = connBufSize;
+ // No-op.
}
/** {@inheritDoc} */
@Override public int getConnectionBufferSize() {
- return connBufSize;
+ return 0;
}
/** {@inheritDoc} */
@IgniteSpiConfiguration(optional = true)
@Override public void setConnectionBufferFlushFrequency(long connBufFlushFreq) {
- this.connBufFlushFreq = connBufFlushFreq;
+ // No-op.
}
/** {@inheritDoc} */
@Override public long getConnectionBufferFlushFrequency() {
- return connBufFlushFreq;
+ return 0;
}
/**
@@ -1168,8 +1150,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
assertParameter(locPort <= 0xffff, "locPort < 0xffff");
assertParameter(locPortRange >= 0, "locPortRange >= 0");
assertParameter(idleConnTimeout > 0, "idleConnTimeout > 0");
- assertParameter(connBufFlushFreq > 0, "connBufFlushFreq > 0");
- assertParameter(connBufSize >= 0, "connBufSize >= 0");
assertParameter(sockRcvBuf >= 0, "sockRcvBuf >= 0");
assertParameter(sockSndBuf >= 0, "sockSndBuf >= 0");
assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
@@ -1239,8 +1219,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("idleConnTimeout", idleConnTimeout));
log.debug(configInfo("directBuf", directBuf));
log.debug(configInfo("directSendBuf", directSndBuf));
- log.debug(configInfo("connBufSize", connBufSize));
- log.debug(configInfo("connBufFlushFreq", connBufFlushFreq));
log.debug(configInfo("selectorsCnt", selectorsCnt));
log.debug(configInfo("tcpNoDelay", tcpNoDelay));
log.debug(configInfo("sockSndBuf", sockSndBuf));
@@ -1255,11 +1233,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
log.debug(configInfo("unackedMsgsBufSize", unackedMsgsBufSize));
}
- if (connBufSize > 8192)
- U.warn(log, "Specified communication IO buffer size is larger than recommended (ignore if done " +
- "intentionally) [specified=" + connBufSize + ", recommended=8192]",
- "Specified communication IO buffer size is larger than recommended (ignore if done intentionally).");
-
if (!tcpNoDelay)
U.quietAndWarn(log, "'TCP_NO_DELAY' for communication is off, which should be used with caution " +
"since may produce significant delays with some scenarios.");
@@ -1272,12 +1245,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
commWorker.start();
- if (connBufSize > 0) {
- clientFlushWorker = new ClientFlushWorker();
-
- clientFlushWorker.start();
- }
-
// Ack start.
if (log.isDebugEnabled())
log.debug(startInfo());
@@ -1431,10 +1398,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
- U.interrupt(clientFlushWorker);
U.interrupt(commWorker);
- U.join(clientFlushWorker, log);
U.join(commWorker, log);
// Force closing on stop (safety).
@@ -2023,10 +1988,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
if (nioSrvr != null)
nioSrvr.stop();
- U.interrupt(clientFlushWorker);
U.interrupt(commWorker);
- U.join(clientFlushWorker, log);
U.join(commWorker, log);
for (GridCommunicationClient client : clients.values())
@@ -2134,58 +2097,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
/**
*
*/
- private class ClientFlushWorker extends IgniteSpiThread {
- /**
- *
- */
- ClientFlushWorker() {
- super(gridName, "nio-client-flusher", log);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings({"BusyWait"})
- @Override protected void body() throws InterruptedException {
- while (!isInterrupted()) {
- long connBufFlushFreq0 = connBufFlushFreq;
-
- for (Map.Entry<UUID, GridCommunicationClient> entry : clients.entrySet()) {
- GridCommunicationClient client = entry.getValue();
-
- if (client.reserve()) {
- boolean err = true;
-
- try {
- client.flushIfNeeded(connBufFlushFreq0);
-
- err = false;
- }
- catch (IOException e) {
- if (getSpiContext().pingNode(entry.getKey()))
- U.error(log, "Failed to flush client: " + client, e);
- else {
- if (log.isDebugEnabled())
- log.debug("Failed to flush client (node left): " + client);
-
- onException("Failed to flush client (node left): " + client, e);
- }
- }
- finally {
- if (err)
- client.forceClose();
- else
- client.release();
- }
- }
- }
-
- Thread.sleep(connBufFlushFreq0);
- }
- }
- }
-
- /**
- *
- */
private class CommunicationWorker extends IgniteSpiThread {
/** */
private final BlockingQueue<GridNioRecoveryDescriptor> q = new LinkedBlockingQueue<>();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index 5c80e6e..6f5a738 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -171,8 +171,6 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
* This frequency defines how often system will advice to flush
* connection buffer.
* <p>
- * If not provided, default value is {@link TcpCommunicationSpi#DFLT_CONN_BUF_FLUSH_FREQ}.
- * <p>
* This property is used only if {@link #getConnectionBufferSize()} is greater than {@code 0}.
*
* @param connBufFlushFreq Flush frequency.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
index e3baeb0..bdf9929 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java
@@ -1286,7 +1286,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest {
}
/**
- * Test client to use instead of {@link GridTcpCommunicationClient}
+ * Test client to use instead of {@link GridTcpNioCommunicationClient}
*/
private static class TestClient implements AutoCloseable {
/** Socket implementation to use. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
index ea51aff..8d27485 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiAbstractTest.java
@@ -64,7 +64,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
// Test idle clients remove.
for (CommunicationSpi spi : spis.values()) {
- ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+ ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assertEquals(2, clients.size());
@@ -77,7 +77,7 @@ abstract class GridTcpCommunicationSpiAbstractTest extends GridAbstractCommunica
super.afterTest();
for (CommunicationSpi spi : spis.values()) {
- ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+ ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
for (int i = 0; i < 20 && !clients.isEmpty(); i++) {
info("Check failed for SPI [grid=" +
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c038ee7..2d175f5 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -256,7 +256,7 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
assertTrue(latch.await(10, TimeUnit.SECONDS));
for (CommunicationSpi spi : spis) {
- ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+ ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assertEquals(1, clients.size());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index 34fa610..c4a0916 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -32,8 +32,6 @@ public class GridTcpCommunicationSpiConfigSelfTest extends GridSpiAbstractConfig
checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPort", 65636);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "localPortRange", -1);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "idleConnectionTimeout", 0);
- checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferSize", -1);
- checkNegativeSpiProperty(new TcpCommunicationSpi(), "connectionBufferFlushFrequency", 0);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketReceiveBuffer", -1);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "socketSendBuffer", -1);
checkNegativeSpiProperty(new TcpCommunicationSpi(), "messageQueueLimit", -1);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index e7ae957..3916f02 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -501,7 +501,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
}
for (CommunicationSpi spi : spis.values()) {
- final ConcurrentMap<UUID, GridTcpCommunicationClient> clients = U.field(spi, "clients");
+ final ConcurrentMap<UUID, GridCommunicationClient> clients = U.field(spi, "clients");
assert GridTestUtils.waitForCondition(new PA() {
@Override public boolean apply() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fb827a77/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index 9c6fbb4..61bb944 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.marshaller.*;
import org.apache.ignite.spi.*;
+import org.apache.ignite.testframework.*;
import org.apache.ignite.testframework.config.*;
import org.apache.ignite.testframework.junits.*;
import org.apache.ignite.testframework.junits.spi.*;
@@ -267,9 +268,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
Collection<UUID> nodeIds = new HashSet<>();
- for (IgniteTestResources rsrc : spiRsrcs) {
+ for (IgniteTestResources rsrc : spiRsrcs)
nodeIds.add(rsrc.getNodeId());
- }
for (ClusterNode node : spi.getRemoteNodes()) {
if (nodeIds.contains(node.id())) {
@@ -390,6 +390,10 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
}
});
+ GridSpiTestContext ctx = initSpiContext();
+
+ GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "spiCtx", ctx);
+
spi.spiStart(getTestGridName() + i);
spis.add(spi);
@@ -397,7 +401,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
spiRsrcs.add(rsrcMgr);
// Force to use test context instead of default dummy context.
- spi.onContextInitialized(initSpiContext());
+ spi.onContextInitialized(ctx);
}
}
catch (Throwable e) {
@@ -438,9 +442,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri
spi.spiStop();
}
- for (IgniteTestResources rscrs : spiRsrcs) {
+ for (IgniteTestResources rscrs : spiRsrcs)
rscrs.stopThreads();
- }
// Clear.
spis.clear();