You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/03/01 21:01:51 UTC
[07/10] incubator-ignite git commit: ignite-11 use
ExecutorCompletionService.
ignite-11 use ExecutorCompletionService.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b9e7f8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b9e7f8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b9e7f8f
Branch: refs/heads/sprint-2
Commit: 8b9e7f8f5ae205da389411f5aa763d329e3ea6bd
Parents: de75adb
Author: sevdokimov <se...@gridgain.com>
Authored: Thu Feb 26 17:23:52 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu Feb 26 17:23:52 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/util/IgniteUtils.java | 1 +
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 85 +++++++++-----------
2 files changed, 40 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b9e7f8f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index a8b6991..b24743a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -298,6 +298,7 @@ public abstract class IgniteUtils {
private static final Map<Class<? extends IgniteCheckedException>, C1<IgniteCheckedException, IgniteException>>
exceptionConverters;
+ /** */
private volatile static IgniteBiTuple<Collection<String>, Collection<String>> cachedLocalAddr;
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b9e7f8f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 322b954..80b793a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -1018,13 +1018,10 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
private int connInProgress;
/** */
- private boolean closed;
-
- /** */
private final ExecutorService executor;
/** */
- private final Queue<GridTuple3<InetSocketAddress, Socket, Exception>> queue = new LinkedList<>();
+ private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
/**
* @param addrs Addresses.
@@ -1033,19 +1030,19 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
connInProgress = addrs.size();
- executor = Executors.newFixedThreadPool(Math.min(10, addrs.size()));
+ executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
+
+ completionSrvc = new ExecutorCompletionService<>(executor);
for (final InetSocketAddress addr : addrs) {
- executor.execute(new Runnable() {
- @Override public void run() {
+ completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
+ @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
Exception ex = null;
Socket sock = null;
for (int i = 0; i < retryCnt; i++) {
- synchronized (SocketMultiConnector.this) {
- if (closed)
- return;
- }
+ if (Thread.currentThread().isInterrupted())
+ return null; // Executor is shutdown.
try {
sock = openSocket(addr);
@@ -1057,16 +1054,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
}
}
- synchronized (SocketMultiConnector.this) {
- if (closed)
- U.closeQuiet(sock);
- else
- queue.add(new GridTuple3<>(addr, sock, ex));
-
- connInProgress--;
-
- SocketMultiConnector.this.notifyAll();
- }
+ return new GridTuple3<>(addr, sock, ex);
}
});
}
@@ -1075,46 +1063,51 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
/**
*
*/
- @Nullable public synchronized GridTuple3<InetSocketAddress, Socket, Exception> next() {
- try {
- do {
- if (closed)
- return null;
+ @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
+ if (connInProgress == 0)
+ return null;
- GridTuple3<InetSocketAddress, Socket, Exception> res = queue.poll();
-
- if (res != null)
- return res;
-
- if (connInProgress == 0)
- return null;
+ try {
+ connInProgress--;
- wait();
- }
- while (true);
+ return completionSrvc.take().get();
}
catch (InterruptedException e) {
throw new IgniteSpiException("Thread has been interrupted.", e);
}
+ catch (ExecutionException e) {
+ throw new IgniteSpiException(e);
+ }
}
/**
*
*/
public void close() {
- synchronized (this) {
- if (closed)
- return;
-
- closed = true;
+ executor.shutdown();
- notifyAll();
- }
+ if (connInProgress > 0) {
+ new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ for (int i = 0; i < connInProgress; i++) {
+ try {
+ GridTuple3<InetSocketAddress, Socket, Exception> take = completionSrvc.take().get();
- executor.shutdown();
+ if (take != null)
+ IgniteUtils.closeQuiet(take.get2());
+ }
+ catch (ExecutionException ignored) {
- for (GridTuple3<InetSocketAddress, Socket, Exception> tuple : queue)
- U.closeQuiet(tuple.get2());
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }).start();
+ }
}
}
}