You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/02 12:46:43 UTC

[08/50] incubator-ignite git commit: # IGNITE-943 Remove SocketMultiConnector

# IGNITE-943 Remove SocketMultiConnector


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6adc9743
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6adc9743
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6adc9743

Branch: refs/heads/ignite-960
Commit: 6adc9743d4912ad59a50832f4869e8e41f5f9a04
Parents: 154bd9e
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 28 17:03:39 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 28 17:03:39 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  21 +--
 .../spi/discovery/tcp/SocketMultiConnector.java | 144 -------------------
 2 files changed, 13 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6adc9743/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a966363..59e25fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -810,17 +810,22 @@ class ServerImpl extends TcpDiscoveryImpl {
             boolean retry = false;
             Collection<Exception> errs = new ArrayList<>();
 
-            try (SocketMultiConnector multiConnector = new SocketMultiConnector(spi, addrs, 2)) {
-                GridTuple3<InetSocketAddress, Socket, Exception> tuple;
+            for (int j = 2; --j >= 0;) {
+                for (InetSocketAddress addr : addrs) {
+                    Socket sock = null;
+                    Exception ex = null;
 
-                while ((tuple = multiConnector.next()) != null) {
-                    InetSocketAddress addr = tuple.get1();
-                    Socket sock = tuple.get2();
-                    Exception ex = tuple.get3();
+                    try {
+                        sock = spi.openSocket(addr);
+                    }
+                    catch (Exception e) {
+                        if (j > 0)
+                            continue;
 
-                    if (ex == null) {
-                        assert sock != null;
+                        ex = e;
+                    }
 
+                    if (ex == null) {
                         try {
                             Integer res = sendMessageDirectly(joinReq, addr, sock);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6adc9743/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
deleted file mode 100644
index 698735e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.spi.discovery.tcp;
-
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.spi.*;
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Allow to connect to addresses parallel.
- */
-class SocketMultiConnector implements AutoCloseable {
-    /** */
-    private int connInProgress;
-
-    /** */
-    private final ExecutorService executor;
-
-    /** */
-    private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
-
-    /**
-     * @param spi Discovery SPI.
-     * @param addrs Addresses.
-     * @param retryCnt Retry count.
-     */
-    SocketMultiConnector(final TcpDiscoverySpi spi, Collection<InetSocketAddress> addrs,
-        final int retryCnt) {
-        connInProgress = addrs.size();
-
-        executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
-
-        completionSrvc = new ExecutorCompletionService<>(executor);
-
-        for (final InetSocketAddress addr : addrs) {
-            completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
-                @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
-                    Exception ex = null;
-                    Socket sock = null;
-
-                    for (int i = 0; i < retryCnt; i++) {
-                        if (Thread.currentThread().isInterrupted())
-                            return null; // Executor is shutdown.
-
-                        try {
-                            sock = spi.openSocket(addr);
-
-                            break;
-                        }
-                        catch (Exception e) {
-                            ex = e;
-                        }
-                    }
-
-                    return new GridTuple3<>(addr, sock, ex);
-                }
-            });
-        }
-    }
-
-    /**
-     *
-     */
-    @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
-        if (connInProgress == 0)
-            return null;
-
-        try {
-            Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
-
-            connInProgress--;
-
-            return fut.get();
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteSpiException("Thread has been interrupted.", e);
-        }
-        catch (ExecutionException e) {
-            throw new IgniteSpiException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void close() {
-        List<Runnable> unstartedTasks = executor.shutdownNow();
-
-        connInProgress -= unstartedTasks.size();
-
-        if (connInProgress > 0) {
-            Thread thread = new Thread(new Runnable() {
-                @Override public void run() {
-                    try {
-                        executor.awaitTermination(5, TimeUnit.MINUTES);
-
-                        Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
-
-                        while ((fut = completionSrvc.poll()) != null) {
-                            try {
-                                GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
-
-                                if (tuple3 != null)
-                                    IgniteUtils.closeQuiet(tuple3.get2());
-                            }
-                            catch (ExecutionException ignore) {
-
-                            }
-                        }
-                    }
-                    catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-
-                        throw new RuntimeException(e);
-                    }
-                }
-            });
-
-            thread.setDaemon(true);
-
-            thread.start();
-        }
-    }
-}