You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/03/02 10:49:49 UTC

[04/50] [abbrv] incubator-ignite git commit: ignite-11 use ExecutorCompletionService.

ignite-11 use ExecutorCompletionService.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8b9e7f8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8b9e7f8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8b9e7f8f

Branch: refs/heads/ignite-342
Commit: 8b9e7f8f5ae205da389411f5aa763d329e3ea6bd
Parents: de75adb
Author: sevdokimov <se...@gridgain.com>
Authored: Thu Feb 26 17:23:52 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu Feb 26 17:23:52 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/util/IgniteUtils.java       |  1 +
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   | 85 +++++++++-----------
 2 files changed, 40 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b9e7f8f/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index a8b6991..b24743a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -298,6 +298,7 @@ public abstract class IgniteUtils {
     private static final Map<Class<? extends IgniteCheckedException>, C1<IgniteCheckedException, IgniteException>>
         exceptionConverters;
 
+    /** */
     private volatile static IgniteBiTuple<Collection<String>, Collection<String>> cachedLocalAddr;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8b9e7f8f/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 322b954..80b793a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -1018,13 +1018,10 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
         private int connInProgress;
 
         /** */
-        private boolean closed;
-
-        /** */
         private final ExecutorService executor;
 
         /** */
-        private final Queue<GridTuple3<InetSocketAddress, Socket, Exception>> queue = new LinkedList<>();
+        private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
 
         /**
          * @param addrs Addresses.
@@ -1033,19 +1030,19 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
         public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
             connInProgress = addrs.size();
 
-            executor = Executors.newFixedThreadPool(Math.min(10, addrs.size()));
+            executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
+
+            completionSrvc = new ExecutorCompletionService<>(executor);
 
             for (final InetSocketAddress addr : addrs) {
-                executor.execute(new Runnable() {
-                    @Override public void run() {
+                completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
+                    @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
                         Exception ex = null;
                         Socket sock = null;
 
                         for (int i = 0; i < retryCnt; i++) {
-                            synchronized (SocketMultiConnector.this) {
-                                if (closed)
-                                    return;
-                            }
+                            if (Thread.currentThread().isInterrupted())
+                                return null; // Executor is shutdown.
 
                             try {
                                 sock = openSocket(addr);
@@ -1057,16 +1054,7 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
                             }
                         }
 
-                        synchronized (SocketMultiConnector.this) {
-                            if (closed)
-                                U.closeQuiet(sock);
-                            else
-                                queue.add(new GridTuple3<>(addr, sock, ex));
-
-                            connInProgress--;
-
-                            SocketMultiConnector.this.notifyAll();
-                        }
+                        return new GridTuple3<>(addr, sock, ex);
                     }
                 });
             }
@@ -1075,46 +1063,51 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
         /**
          *
          */
-        @Nullable public synchronized GridTuple3<InetSocketAddress, Socket, Exception> next() {
-            try {
-                do {
-                    if (closed)
-                        return null;
+        @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
+            if (connInProgress == 0)
+                return null;
 
-                    GridTuple3<InetSocketAddress, Socket, Exception> res = queue.poll();
-
-                    if (res != null)
-                        return res;
-
-                    if (connInProgress == 0)
-                        return null;
+            try {
+                connInProgress--;
 
-                    wait();
-                }
-                while (true);
+                return completionSrvc.take().get();
             }
             catch (InterruptedException e) {
                 throw new IgniteSpiException("Thread has been interrupted.", e);
             }
+            catch (ExecutionException e) {
+                throw new IgniteSpiException(e);
+            }
         }
 
         /**
          *
          */
         public void close() {
-            synchronized (this) {
-                if (closed)
-                    return;
-
-                closed = true;
+            executor.shutdown();
 
-                notifyAll();
-            }
+            if (connInProgress > 0) {
+                new Thread(new Runnable() {
+                    @Override public void run() {
+                        try {
+                            for (int i = 0; i < connInProgress; i++) {
+                                try {
+                                    GridTuple3<InetSocketAddress, Socket, Exception> take = completionSrvc.take().get();
 
-            executor.shutdown();
+                                    if (take != null)
+                                        IgniteUtils.closeQuiet(take.get2());
+                                }
+                                catch (ExecutionException ignored) {
 
-            for (GridTuple3<InetSocketAddress, Socket, Exception> tuple : queue)
-                U.closeQuiet(tuple.get2());
+                                }
+                            }
+                        }
+                        catch (InterruptedException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }).start();
+            }
         }
     }
 }