You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2017/02/14 19:27:40 UTC

spark git commit: [SPARK-19529][BRANCH-1.6] Backport PR #16866 to branch-1.6

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 e78138a43 -> a50ef3d9a


[SPARK-19529][BRANCH-1.6] Backport PR #16866 to branch-1.6

## What changes were proposed in this pull request?

This PR backports PR #16866 to branch-1.6

## How was this patch tested?

Existing tests.

Author: Cheng Lian <li...@databricks.com>

Closes #16917 from liancheng/spark-19529-1.6-backport.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a50ef3d9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a50ef3d9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a50ef3d9

Branch: refs/heads/branch-1.6
Commit: a50ef3d9a06fcbb8c5eca0762fdf0967f4aa7a88
Parents: e78138a
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Feb 14 11:27:37 2017 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Feb 14 11:27:37 2017 -0800

----------------------------------------------------------------------
 .../network/client/TransportClientFactory.java      | 10 ++++++----
 .../spark/network/TransportClientFactorySuite.java  |  6 ++++--
 .../network/shuffle/ExternalShuffleClient.java      |  4 ++--
 .../spark/network/shuffle/RetryingBlockFetcher.java |  3 ++-
 .../shuffle/mesos/MesosExternalShuffleClient.java   |  3 ++-
 .../spark/network/sasl/SaslIntegrationSuite.java    |  4 ++--
 .../shuffle/ExternalShuffleIntegrationSuite.java    |  2 +-
 .../shuffle/ExternalShuffleSecuritySuite.java       |  7 ++++---
 .../network/shuffle/RetryingBlockFetcherSuite.java  | 16 ++++++++--------
 9 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
----------------------------------------------------------------------
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 61bafc8..5b438b7 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -120,7 +120,8 @@ public class TransportClientFactory implements Closeable {
    *
    * Concurrency: This method is safe to call from multiple threads.
    */
-  public TransportClient createClient(String remoteHost, int remotePort) throws IOException {
+  public TransportClient createClient(String remoteHost, int remotePort)
+      throws IOException, InterruptedException {
     // Get connection from the connection pool first.
     // If it is not found or not active, create a new one.
     final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
@@ -176,13 +177,14 @@ public class TransportClientFactory implements Closeable {
    * As with {@link #createClient(String, int)}, this method is blocking.
    */
   public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
-      throws IOException {
+      throws IOException, InterruptedException {
     final InetSocketAddress address = new InetSocketAddress(remoteHost, remotePort);
     return createClient(address);
   }
 
   /** Create a completely new {@link TransportClient} to the remote address. */
-  private TransportClient createClient(InetSocketAddress address) throws IOException {
+  private TransportClient createClient(InetSocketAddress address)
+      throws IOException, InterruptedException {
     logger.debug("Creating new connection to " + address);
 
     Bootstrap bootstrap = new Bootstrap();
@@ -209,7 +211,7 @@ public class TransportClientFactory implements Closeable {
     // Connect to the remote server
     long preConnect = System.nanoTime();
     ChannelFuture cf = bootstrap.connect(address);
-    if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
+    if (!cf.await(conf.connectionTimeoutMs())) {
       throw new IOException(
         String.format("Connecting to %s timed out (%s ms)", address, conf.connectionTimeoutMs()));
     } else if (cf.cause() != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
----------------------------------------------------------------------
diff --git a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
index dac7d4a..6b77e34 100644
--- a/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
+++ b/network/common/src/test/java/org/apache/spark/network/TransportClientFactorySuite.java
@@ -99,6 +99,8 @@ public class TransportClientFactorySuite {
             clients.add(client);
           } catch (IOException e) {
             failed.incrementAndGet();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
           }
         }
       };
@@ -140,7 +142,7 @@ public class TransportClientFactorySuite {
   }
 
   @Test
-  public void returnDifferentClientsForDifferentServers() throws IOException {
+  public void returnDifferentClientsForDifferentServers() throws IOException, InterruptedException {
     TransportClientFactory factory = context.createClientFactory();
     TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
     TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());
@@ -169,7 +171,7 @@ public class TransportClientFactorySuite {
   }
 
   @Test
-  public void closeBlockClientsWithFactory() throws IOException {
+  public void closeBlockClientsWithFactory() throws IOException, InterruptedException {
     TransportClientFactory factory = context.createClientFactory();
     TransportClient c1 = factory.createClient(TestUtils.getLocalHost(), server1.getPort());
     TransportClient c2 = factory.createClient(TestUtils.getLocalHost(), server2.getPort());

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
index 58ca87d..fa6a06e 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
@@ -101,7 +101,7 @@ public class ExternalShuffleClient extends ShuffleClient {
         new RetryingBlockFetcher.BlockFetchStarter() {
           @Override
           public void createAndStart(String[] blockIds, BlockFetchingListener listener)
-              throws IOException {
+              throws IOException, InterruptedException {
             TransportClient client = clientFactory.createClient(host, port);
             new OneForOneBlockFetcher(client, appId, execId, blockIds, listener).start();
           }
@@ -136,7 +136,7 @@ public class ExternalShuffleClient extends ShuffleClient {
       String host,
       int port,
       String execId,
-      ExecutorShuffleInfo executorInfo) throws IOException {
+      ExecutorShuffleInfo executorInfo) throws IOException, InterruptedException {
     checkInit();
     TransportClient client = clientFactory.createUnmanagedClient(host, port);
     try {

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 4bb0498..44a515b 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -57,7 +57,8 @@ public class RetryingBlockFetcher {
      * {@link org.apache.spark.network.client.TransportClientFactory} in order to fix connection
      * issues.
      */
-    void createAndStart(String[] blockIds, BlockFetchingListener listener) throws IOException;
+    void createAndStart(String[] blockIds, BlockFetchingListener listener)
+         throws IOException, InterruptedException;
   }
 
   /** Shared executor service used for waiting and retrying. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index 6758203..1eac227 100644
--- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -53,7 +53,8 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient {
     super(conf, secretKeyHolder, saslEnabled, saslEncryptionEnabled);
   }
 
-  public void registerDriverWithShuffleService(String host, int port) throws IOException {
+  public void registerDriverWithShuffleService(String host, int port)
+      throws IOException, InterruptedException {
     checkInit();
     ByteBuffer registerDriver = new RegisterDriver(appId).toByteBuffer();
     TransportClient client = clientFactory.createClient(host, port);

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
index 19c870a..a94fd18 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java
@@ -102,7 +102,7 @@ public class SaslIntegrationSuite {
   }
 
   @Test
-  public void testGoodClient() throws IOException {
+  public void testGoodClient() throws IOException, InterruptedException {
     clientFactory = context.createClientFactory(
       Lists.<TransportClientBootstrap>newArrayList(
         new SaslClientBootstrap(conf, "app-1", secretKeyHolder)));
@@ -132,7 +132,7 @@ public class SaslIntegrationSuite {
   }
 
   @Test
-  public void testNoSaslClient() throws IOException {
+  public void testNoSaslClient() throws IOException, InterruptedException {
     clientFactory = context.createClientFactory(
       Lists.<TransportClientBootstrap>newArrayList());
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
index 2095f41..3551a96 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java
@@ -273,7 +273,7 @@ public class ExternalShuffleIntegrationSuite {
   }
 
   private void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
-      throws IOException {
+      throws IOException, InterruptedException {
     ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, false);
     client.init(APP_ID);
     client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
index 08ddb37..50a5c8e 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java
@@ -60,7 +60,7 @@ public class ExternalShuffleSecuritySuite {
   }
 
   @Test
-  public void testValid() throws IOException {
+  public void testValid() throws IOException, InterruptedException {
     validate("my-app-id", "secret", false);
   }
 
@@ -83,12 +83,13 @@ public class ExternalShuffleSecuritySuite {
   }
 
   @Test
-  public void testEncryption() throws IOException {
+  public void testEncryption() throws IOException, InterruptedException {
     validate("my-app-id", "secret", true);
   }
 
   /** Creates an ExternalShuffleClient and attempts to register with the server. */
-  private void validate(String appId, String secretKey, boolean encrypt) throws IOException {
+  private void validate(String appId, String secretKey, boolean encrypt)
+      throws IOException, InterruptedException {
     ExternalShuffleClient client =
       new ExternalShuffleClient(conf, new TestSecretKeyHolder(appId, secretKey), true, encrypt);
     client.init(appId);

http://git-wip-us.apache.org/repos/asf/spark/blob/a50ef3d9/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
----------------------------------------------------------------------
diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
index 3a6ef0d..3e2c532 100644
--- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
@@ -66,7 +66,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testNoFailures() throws IOException {
+  public void testNoFailures() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -85,7 +85,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testUnrecoverableFailure() throws IOException {
+  public void testUnrecoverableFailure() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -104,7 +104,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testSingleIOExceptionOnFirst() throws IOException {
+  public void testSingleIOExceptionOnFirst() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -127,7 +127,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testSingleIOExceptionOnSecond() throws IOException {
+  public void testSingleIOExceptionOnSecond() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -149,7 +149,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testTwoIOExceptions() throws IOException {
+  public void testTwoIOExceptions() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -177,7 +177,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testThreeIOExceptions() throws IOException {
+  public void testThreeIOExceptions() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -209,7 +209,7 @@ public class RetryingBlockFetcherSuite {
   }
 
   @Test
-  public void testRetryAndUnrecoverable() throws IOException {
+  public void testRetryAndUnrecoverable() throws IOException, InterruptedException {
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
 
     List<? extends Map<String, Object>> interactions = Arrays.asList(
@@ -252,7 +252,7 @@ public class RetryingBlockFetcherSuite {
   @SuppressWarnings("unchecked")
   private static void performInteractions(List<? extends Map<String, Object>> interactions,
                                           BlockFetchingListener listener)
-    throws IOException {
+      throws IOException, InterruptedException {
 
     TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
     BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org