You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/05/26 18:17:29 UTC
[3/3] incubator-ignite git commit: # IGNITE-943 Extract
SocketMultiConnector class.
# IGNITE-943 Extract SocketMultiConnector class.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/2f169f57
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/2f169f57
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/2f169f57
Branch: refs/heads/ignite-943
Commit: 2f169f5799fe9fca46b022f9c7f55dc37b816a74
Parents: f43cbbb
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 26 19:16:43 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 26 19:16:43 2015 +0300
----------------------------------------------------------------------
.../spi/discovery/tcp/SocketMultiConnector.java | 144 +++++++++++++++++++
.../spi/discovery/tcp/TcpDiscoverySpi.java | 2 +-
.../discovery/tcp/TcpDiscoverySpiAdapter.java | 116 ---------------
3 files changed, 145 insertions(+), 117 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
new file mode 100644
index 0000000..b988ceb
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/SocketMultiConnector.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.discovery.tcp;
+
+import org.apache.ignite.internal.util.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Allow to connect to addresses parallel.
+ */
+class SocketMultiConnector implements AutoCloseable {
+ /** */
+ private int connInProgress;
+
+ /** */
+ private final ExecutorService executor;
+
+ /** */
+ private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
+
+ /**
+ * @param spi Discovery SPI.
+ * @param addrs Addresses.
+ * @param retryCnt Retry count.
+ */
+ SocketMultiConnector(final TcpDiscoverySpiAdapter spi, Collection<InetSocketAddress> addrs,
+ final int retryCnt) {
+ connInProgress = addrs.size();
+
+ executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
+
+ completionSrvc = new ExecutorCompletionService<>(executor);
+
+ for (final InetSocketAddress addr : addrs) {
+ completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
+ @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
+ Exception ex = null;
+ Socket sock = null;
+
+ for (int i = 0; i < retryCnt; i++) {
+ if (Thread.currentThread().isInterrupted())
+ return null; // Executor is shutdown.
+
+ try {
+ sock = spi.openSocket(addr);
+
+ break;
+ }
+ catch (Exception e) {
+ ex = e;
+ }
+ }
+
+ return new GridTuple3<>(addr, sock, ex);
+ }
+ });
+ }
+ }
+
+ /**
+ *
+ */
+ @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
+ if (connInProgress == 0)
+ return null;
+
+ try {
+ Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
+
+ connInProgress--;
+
+ return fut.get();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteSpiException("Thread has been interrupted.", e);
+ }
+ catch (ExecutionException e) {
+ throw new IgniteSpiException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ List<Runnable> unstartedTasks = executor.shutdownNow();
+
+ connInProgress -= unstartedTasks.size();
+
+ if (connInProgress > 0) {
+ Thread thread = new Thread(new Runnable() {
+ @Override public void run() {
+ try {
+ executor.awaitTermination(5, TimeUnit.MINUTES);
+
+ Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
+
+ while ((fut = completionSrvc.poll()) != null) {
+ try {
+ GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
+
+ if (tuple3 != null)
+ IgniteUtils.closeQuiet(tuple3.get2());
+ }
+ catch (ExecutionException ignore) {
+
+ }
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new RuntimeException(e);
+ }
+ }
+ });
+
+ thread.setDaemon(true);
+
+ thread.start();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 37a07d6..8ceac1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -1344,7 +1344,7 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
boolean retry = false;
Collection<Exception> errs = new ArrayList<>();
- try (SocketMultiConnector multiConnector = new SocketMultiConnector(addrs, 2)) {
+ try (SocketMultiConnector multiConnector = new SocketMultiConnector(this, addrs, 2)) {
GridTuple3<InetSocketAddress, Socket, Exception> tuple;
while ((tuple = multiConnector.next()) != null) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/2f169f57/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 4349ebc..ddbea0c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -22,7 +22,6 @@ import org.apache.ignite.cluster.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.io.*;
-import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
@@ -39,7 +38,6 @@ import org.jetbrains.annotations.*;
import java.io.*;
import java.net.*;
import java.util.*;
-import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
/**
@@ -1184,118 +1182,4 @@ public abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements
}
}
- /**
- * Allow to connect to addresses parallel.
- */
- protected class SocketMultiConnector implements AutoCloseable {
- /** */
- private int connInProgress;
-
- /** */
- private final ExecutorService executor;
-
- /** */
- private final CompletionService<GridTuple3<InetSocketAddress, Socket, Exception>> completionSrvc;
-
- /**
- * @param addrs Addresses.
- * @param retryCnt Retry count.
- */
- public SocketMultiConnector(Collection<InetSocketAddress> addrs, final int retryCnt) {
- connInProgress = addrs.size();
-
- executor = Executors.newFixedThreadPool(Math.min(1, addrs.size()));
-
- completionSrvc = new ExecutorCompletionService<>(executor);
-
- for (final InetSocketAddress addr : addrs) {
- completionSrvc.submit(new Callable<GridTuple3<InetSocketAddress, Socket, Exception>>() {
- @Override public GridTuple3<InetSocketAddress, Socket, Exception> call() {
- Exception ex = null;
- Socket sock = null;
-
- for (int i = 0; i < retryCnt; i++) {
- if (Thread.currentThread().isInterrupted())
- return null; // Executor is shutdown.
-
- try {
- sock = openSocket(addr);
-
- break;
- }
- catch (Exception e) {
- ex = e;
- }
- }
-
- return new GridTuple3<>(addr, sock, ex);
- }
- });
- }
- }
-
- /**
- *
- */
- @Nullable public GridTuple3<InetSocketAddress, Socket, Exception> next() {
- if (connInProgress == 0)
- return null;
-
- try {
- Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut = completionSrvc.take();
-
- connInProgress--;
-
- return fut.get();
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteSpiException("Thread has been interrupted.", e);
- }
- catch (ExecutionException e) {
- throw new IgniteSpiException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- List<Runnable> unstartedTasks = executor.shutdownNow();
-
- connInProgress -= unstartedTasks.size();
-
- if (connInProgress > 0) {
- Thread thread = new Thread(new Runnable() {
- @Override public void run() {
- try {
- executor.awaitTermination(5, TimeUnit.MINUTES);
-
- Future<GridTuple3<InetSocketAddress, Socket, Exception>> fut;
-
- while ((fut = completionSrvc.poll()) != null) {
- try {
- GridTuple3<InetSocketAddress, Socket, Exception> tuple3 = fut.get();
-
- if (tuple3 != null)
- IgniteUtils.closeQuiet(tuple3.get2());
- }
- catch (ExecutionException ignore) {
-
- }
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new RuntimeException(e);
- }
- }
- });
-
- thread.setDaemon(true);
-
- thread.start();
- }
- }
- }
}