You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/26 18:17:29 UTC

[3/3] incubator-ignite git commit: # IGNITE-943 Extract SocketMultiConnector class.

# IGNITE-943 Extract SocketMultiConnector class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2f169f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2f169f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2f169f57

Branch: refs/heads/ignite-943
Commit: 2f169f5799fe9fca46b022f9c7f55dc37b816a74
Parents: f43cbbb
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 26 19:16:43 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 26 19:16:43 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/SocketMultiConnector.java | 144 +++++++++++++++++++
 .../spi/discovery/tcp/TcpDiscoverySpi.java      |   2 +-
 .../discovery/tcp/TcpDiscoverySpiAdapter.java   | 116 ---------------
 3 files changed, 145 insertions(+), 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
new file mode 100644
index 0000000..b988ceb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Allow to connect to addresses parallel.
+ */
+class SocketMultiConnector implements AutoCloseable {
+    /** */
+    private int connInProgress;
+
+    /** */
+    private final ExecutorService executor;
+
+    /** */
+    private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
+
+    /**
+     * @param spi Discovery SPI.
+     * @param addrs Addresses.
+     * @param retryCnt Retry count.
+     */
+    SocketMultiConnector(final TcpDiscoverySpiAdapter spi, Collection<InetSocketAddress> addrs,
+        final int retryCnt) {
+        connInProgress = addrs.size();
+
+        executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
+
+        completionSrvc = new ExecutorCompletionService<>(executor);
+
+        for (final InetSocketAddress addr : addrs) {
+            completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
+                @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
+                    Exception ex = null;
+                    Socket sock = null;
+
+                    for (int i = 0; i < retryCnt; i++) {
+                        if (Thread.currentThread().isInterrupted())
+                            return null; // Executor is shutdown.
+
+                        try {
+                            sock = spi.openSocket(addr);
+
+                            break;
+                        }
+                        catch (Exception e) {
+                            ex = e;
+                        }
+                    }
+
+                    return new GridTuple3<>(addr, sock, ex);
+                }
+            });
+        }
+    }
+
+    /**
+     *
+     */
+    @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
+        if (connInProgress == 0)
+            return null;
+
+        try {
+            Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
+
+            connInProgress--;
+
+            return fut.get();
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteSpiException("Thread has been interrupted.", e);
+        }
+        catch (ExecutionException e) {
+            throw new IgniteSpiException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        List<Runnable> unstartedTasks = executor.shutdownNow();
+
+        connInProgress -= unstartedTasks.size();
+
+        if (connInProgress > 0) {
+            Thread thread = new Thread(new Runnable() {
+                @Override public void run() {
+                    try {
+                        executor.awaitTermination(5, TimeUnit.MINUTES);
+
+                        Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
+
+                        while ((fut = completionSrvc.poll()) != null) {
+                            try {
+                                GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
+
+                                if (tuple3 != null)
+                                    IgniteUtils.closeQuiet(tuple3.get2());
+                            }
+                            catch (ExecutionException ignore) {
+
+                            }
+                        }
+                    }
+                    catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            thread.setDaemon(true);
+
+            thread.start();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 37a07d6..8ceac1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1344,7 +1344,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
             boolean retry = false;
             Collection<Exception> errs = new ArrayList<>();
 
-            try (SocketMultiConnector multiConnector = new SocketMultiConnector(addrs, 2)) {
+            try (SocketMultiConnector multiConnector = new SocketMultiConnector(this, addrs, 2)) {
                 GridTuple3<InetSocketAddress, Socket, Exception> tuple;
 
                 while ((tuple = multiConnector.next()) != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 4349ebc..ddbea0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cluster.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
@@ -39,7 +38,6 @@ import org.jetbrains.annotations.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
-import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
 /**
@@ -1184,118 +1182,4 @@ public abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements
         }
     }
 
-    /**
-     * Allow to connect to addresses parallel.
-     */
-    protected class SocketMultiConnector implements AutoCloseable {
-        /** */
-        private int connInProgress;
-
-        /** */
-        private final ExecutorService executor;
-
-        /** */
-        private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
-
-        /**
-         * @param addrs Addresses.
-         * @param retryCnt Retry count.
-         */
-        public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
-            connInProgress = addrs.size();
-
-            executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
-
-            completionSrvc = new ExecutorCompletionService<>(executor);
-
-            for (final InetSocketAddress addr : addrs) {
-                completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
-                    @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
-                        Exception ex = null;
-                        Socket sock = null;
-
-                        for (int i = 0; i < retryCnt; i++) {
-                            if (Thread.currentThread().isInterrupted())
-                                return null; // Executor is shutdown.
-
-                            try {
-                                sock = openSocket(addr);
-
-                                break;
-                            }
-                            catch (Exception e) {
-                                ex = e;
-                            }
-                        }
-
-                        return new GridTuple3<>(addr, sock, ex);
-                    }
-                });
-            }
-        }
-
-        /**
-         *
-         */
-        @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
-            if (connInProgress == 0)
-                return null;
-
-            try {
-                Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
-
-                connInProgress--;
-
-                return fut.get();
-            }
-            catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-
-                throw new IgniteSpiException("Thread has been interrupted.", e);
-            }
-            catch (ExecutionException e) {
-                throw new IgniteSpiException(e);
-            }
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() {
-            List<Runnable> unstartedTasks = executor.shutdownNow();
-
-            connInProgress -= unstartedTasks.size();
-
-            if (connInProgress > 0) {
-                Thread thread = new Thread(new Runnable() {
-                    @Override public void run() {
-                        try {
-                            executor.awaitTermination(5, TimeUnit.MINUTES);
-
-                            Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
-
-                            while ((fut = completionSrvc.poll()) != null) {
-                                try {
-                                    GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
-
-                                    if (tuple3 != null)
-                                        IgniteUtils.closeQuiet(tuple3.get2());
-                                }
-                                catch (ExecutionException ignore) {
-
-                                }
-                            }
-                        }
-                        catch (InterruptedException e) {
-                            Thread.currentThread().interrupt();
-
-                            throw new RuntimeException(e);
-                        }
-                    }
-                });
-
-                thread.setDaemon(true);
-
-                thread.start();
-            }
-        }
-    }
 }