You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/04/13 00:28:11 UTC

spark git commit: [SPARK-14547] Avoid DNS resolution for reusing connections

Repository: spark
Updated Branches:
  refs/heads/master 1ef5f8cfa -> c439d88e9


[SPARK-14547] Avoid DNS resolution for reusing connections

## What changes were proposed in this pull request?
This patch changes the connection creation logic in the network client module to avoid DNS resolution when reusing connections.

## How was this patch tested?
Testing in production. This is too difficult to test in isolation (for high fidelity unit tests, we'd need to change the DNS resolution behavior in the JVM).

Author: Reynold Xin <rx...@databricks.com>

Closes #12315 from rxin/SPARK-14547.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c439d88e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c439d88e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c439d88e

Branch: refs/heads/master
Commit: c439d88e99c35a5f29f071715addfee8cbb215dc
Parents: 1ef5f8c
Author: Reynold Xin <rx...@databricks.com>
Authored: Tue Apr 12 15:28:08 2016 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Apr 12 15:28:08 2016 -0700

----------------------------------------------------------------------
 .../network/client/TransportClientFactory.java  | 31 +++++++++++++-------
 1 file changed, 20 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c439d88e/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index b5a9d66..a27aaf2 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -123,16 +123,15 @@ public class TransportClientFactory implements Closeable {
   public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
     // Get connection from the connection pool first.
     // If it is not found or not active, create a new one.
-    long preResolveHost = System.nanoTime();
-    final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
-    long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
-    logger.info("Spent {} ms to resolve {}", hostResolveTimeMs, address);
+    // Use unresolved address here to avoid DNS resolution each time we creates a client.
+    final InetSocketAddress unresolvedAddress =
+      InetSocketAddress.createUnresolved(remoteHost, remotePort);
 
     // Create the ClientPool if we don't have it yet.
-    ClientPool clientPool = connectionPool.get(address);
+    ClientPool clientPool = connectionPool.get(unresolvedAddress);
     if (clientPool == null) {
-      connectionPool.putIfAbsent(address, new ClientPool(numConnectionsPerPeer));
-      clientPool = connectionPool.get(address);
+      connectionPool.putIfAbsent(unresolvedAddress, new ClientPool(numConnectionsPerPeer));
+      clientPool = connectionPool.get(unresolvedAddress);
     }
 
     int clientIndex = rand.nextInt(numConnectionsPerPeer);
@@ -149,25 +148,35 @@ public class TransportClientFactory implements Closeable {
       }
 
       if (cachedClient.isActive()) {
-        logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+        logger.trace("Returning cached connection to {}: {}",
+          cachedClient.getSocketAddress(), cachedClient);
         return cachedClient;
       }
     }
 
     // If we reach here, we don't have an existing connection open. Let's create a new one.
     // Multiple threads might race here to create new connections. Keep only one of them active.
+    final long preResolveHost = System.nanoTime();
+    final InetSocketAddress resolvedAddress = new InetSocketAddress(remoteHost, remotePort);
+    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 1000000;
+    if (hostResolveTimeMs > 2000) {
+      logger.warn("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+    } else {
+      logger.trace("DNS resolution for {} took {} ms", resolvedAddress, hostResolveTimeMs);
+    }
+
     synchronized (clientPool.locks[clientIndex]) {
       cachedClient = clientPool.clients[clientIndex];
 
       if (cachedClient != null) {
         if (cachedClient.isActive()) {
-          logger.trace("Returning cached connection to {}: {}", address, cachedClient);
+          logger.trace("Returning cached connection to {}: {}", resolvedAddress, cachedClient);
           return cachedClient;
         } else {
-          logger.info("Found inactive connection to {}, creating a new one.", address);
+          logger.info("Found inactive connection to {}, creating a new one.", resolvedAddress);
         }
       }
-      clientPool.clients[clientIndex] = createClient(address);
+      clientPool.clients[clientIndex] = createClient(resolvedAddress);
       return clientPool.clients[clientIndex];
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org