You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2018/08/17 05:42:06 UTC

hadoop git commit: HADOOP-15655. Enhance KMS client retry behavior. Contributed by Kitti Nanasi.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2d13e410d -> edeb2a356


HADOOP-15655. Enhance KMS client retry behavior. Contributed by Kitti Nanasi.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/edeb2a35
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/edeb2a35
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/edeb2a35

Branch: refs/heads/trunk
Commit: edeb2a356ad671d962764c6e2aee9f9e7d6f394c
Parents: 2d13e41
Author: Xiao Chen <xi...@apache.org>
Authored: Thu Aug 16 22:32:32 2018 -0700
Committer: Xiao Chen <xi...@apache.org>
Committed: Thu Aug 16 22:42:03 2018 -0700

----------------------------------------------------------------------
 .../key/kms/LoadBalancingKMSClientProvider.java |  43 ++---
 .../kms/TestLoadBalancingKMSClientProvider.java | 181 ++++++++++++++++++-
 2 files changed, 193 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/edeb2a35/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
index 23cdc50..e68e844 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
@@ -113,8 +113,8 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
     return providers;
   }
 
-  private <T> T doOp(ProviderCallable<T> op, int currPos)
-      throws IOException {
+  private <T> T doOp(ProviderCallable<T> op, int currPos,
+      boolean isIdempotent) throws IOException {
     if (providers.length == 0) {
       throw new IOException("No providers configured !");
     }
@@ -143,7 +143,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
         }
         RetryAction action = null;
         try {
-          action = retryPolicy.shouldRetry(ioe, 0, numFailovers, false);
+          action = retryPolicy.shouldRetry(ioe, 0, numFailovers, isIdempotent);
         } catch (Exception e) {
           if (e instanceof IOException) {
             throw (IOException)e;
@@ -201,7 +201,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public Token<?>[] call(KMSClientProvider provider) throws IOException {
         return provider.addDelegationTokens(renewer, credentials);
       }
-    }, nextIdx());
+    }, nextIdx(), false);
   }
 
   @Override
@@ -211,7 +211,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public Long call(KMSClientProvider provider) throws IOException {
         return provider.renewDelegationToken(token);
       }
-    }, nextIdx());
+    }, nextIdx(), false);
   }
 
   @Override
@@ -222,7 +222,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
         provider.cancelDelegationToken(token);
         return null;
       }
-    }, nextIdx());
+    }, nextIdx(), false);
   }
 
   // This request is sent to all providers in the load-balancing group
@@ -275,7 +275,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
             throws IOException, GeneralSecurityException {
           return provider.generateEncryptedKey(encryptionKeyName);
         }
-      }, nextIdx());
+      }, nextIdx(), true);
     } catch (WrapperException we) {
       if (we.getCause() instanceof GeneralSecurityException) {
         throw (GeneralSecurityException) we.getCause();
@@ -295,7 +295,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
             throws IOException, GeneralSecurityException {
           return provider.decryptEncryptedKey(encryptedKeyVersion);
         }
-      }, nextIdx());
+      }, nextIdx(), true);
     } catch (WrapperException we) {
       if (we.getCause() instanceof GeneralSecurityException) {
         throw (GeneralSecurityException) we.getCause();
@@ -315,7 +315,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
             throws IOException, GeneralSecurityException {
           return provider.reencryptEncryptedKey(ekv);
         }
-      }, nextIdx());
+      }, nextIdx(), true);
     } catch (WrapperException we) {
       if (we.getCause() instanceof GeneralSecurityException) {
         throw (GeneralSecurityException) we.getCause();
@@ -335,7 +335,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
           provider.reencryptEncryptedKeys(ekvs);
           return null;
         }
-      }, nextIdx());
+      }, nextIdx(), true);
     } catch (WrapperException we) {
       if (we.getCause() instanceof GeneralSecurityException) {
         throw (GeneralSecurityException) we.getCause();
@@ -351,7 +351,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public KeyVersion call(KMSClientProvider provider) throws IOException {
         return provider.getKeyVersion(versionName);
       }
-    }, nextIdx());
+    }, nextIdx(), true);
   }
 
   @Override
@@ -361,7 +361,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public List<String> call(KMSClientProvider provider) throws IOException {
         return provider.getKeys();
       }
-    }, nextIdx());
+    }, nextIdx(), true);
   }
 
   @Override
@@ -371,7 +371,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public Metadata[] call(KMSClientProvider provider) throws IOException {
         return provider.getKeysMetadata(names);
       }
-    }, nextIdx());
+    }, nextIdx(), true);
   }
 
   @Override
@@ -382,7 +382,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
           throws IOException {
         return provider.getKeyVersions(name);
       }
-    }, nextIdx());
+    }, nextIdx(), true);
   }
 
   @Override
@@ -392,8 +392,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public KeyVersion call(KMSClientProvider provider) throws IOException {
         return provider.getCurrentKey(name);
       }
-    }, nextIdx());
+    }, nextIdx(), true);
   }
+
   @Override
   public Metadata getMetadata(final String name) throws IOException {
     return doOp(new ProviderCallable<Metadata>() {
@@ -401,7 +402,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public Metadata call(KMSClientProvider provider) throws IOException {
         return provider.getMetadata(name);
       }
-    }, nextIdx());
+    }, nextIdx(), true);
   }
 
   @Override
@@ -412,7 +413,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public KeyVersion call(KMSClientProvider provider) throws IOException {
         return provider.createKey(name, material, options);
       }
-    }, nextIdx());
+    }, nextIdx(), false);
   }
 
   @Override
@@ -425,7 +426,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
             NoSuchAlgorithmException {
           return provider.createKey(name, options);
         }
-      }, nextIdx());
+      }, nextIdx(), false);
     } catch (WrapperException e) {
       if (e.getCause() instanceof GeneralSecurityException) {
         throw (NoSuchAlgorithmException) e.getCause();
@@ -442,7 +443,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
         provider.deleteKey(name);
         return null;
       }
-    }, nextIdx());
+    }, nextIdx(), false);
   }
 
   @Override
@@ -453,7 +454,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
       public KeyVersion call(KMSClientProvider provider) throws IOException {
         return provider.rollNewVersion(name, material);
       }
-    }, nextIdx());
+    }, nextIdx(), false);
     invalidateCache(name);
     return newVersion;
   }
@@ -468,7 +469,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
             NoSuchAlgorithmException {
           return provider.rollNewVersion(name);
         }
-      }, nextIdx());
+      }, nextIdx(), false);
       invalidateCache(name);
       return newVersion;
     } catch (WrapperException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/edeb2a35/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
index 4e7aed9..058db92 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
@@ -29,10 +29,13 @@ import static org.mockito.Mockito.verify;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.List;
 
 import javax.net.ssl.SSLHandshakeException;
 
@@ -355,24 +358,27 @@ public class TestLoadBalancingKMSClientProvider {
   }
 
   /**
-   * Tests whether retryPolicy fails immediately, after trying each provider
-   * once, on encountering IOException which is not SocketException.
+   * Tests whether retryPolicy fails immediately on non-idempotent operations,
+   * after trying each provider once,
+   * on encountering IOException which is not SocketException.
    * @throws Exception
    */
   @Test
-  public void testClientRetriesWithIOException() throws Exception {
+  public void testClientRetriesNonIdempotentOpWithIOExceptionFailsImmediately()
+      throws Exception {
     Configuration conf = new Configuration();
+    final String keyName = "test";
     // Setting total failover attempts to .
     conf.setInt(
         CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
     KMSClientProvider p1 = mock(KMSClientProvider.class);
-    when(p1.getMetadata(Mockito.anyString()))
+    when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class)))
         .thenThrow(new IOException("p1"));
     KMSClientProvider p2 = mock(KMSClientProvider.class);
-    when(p2.getMetadata(Mockito.anyString()))
+    when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class)))
         .thenThrow(new IOException("p2"));
     KMSClientProvider p3 = mock(KMSClientProvider.class);
-    when(p3.getMetadata(Mockito.anyString()))
+    when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class)))
         .thenThrow(new IOException("p3"));
 
     when(p1.getKMSUrl()).thenReturn("p1");
@@ -381,17 +387,61 @@ public class TestLoadBalancingKMSClientProvider {
     LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
         new KMSClientProvider[] {p1, p2, p3}, 0, conf);
     try {
-      kp.getMetadata("test3");
+      kp.createKey(keyName, new Options(conf));
       fail("Should fail since all providers threw an IOException");
     } catch (Exception e) {
       assertTrue(e instanceof IOException);
     }
     verify(kp.getProviders()[0], Mockito.times(1))
-        .getMetadata(Mockito.eq("test3"));
+        .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+    verify(kp.getProviders()[1], Mockito.times(1))
+        .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+    verify(kp.getProviders()[2], Mockito.times(1))
+        .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+  }
+
+  /**
+   * Tests whether retryPolicy retries on idempotent operations
+   * when encountering IOException.
+   * @throws Exception
+   */
+  @Test
+  public void testClientRetriesIdempotentOpWithIOExceptionSucceedsSecondTime()
+      throws Exception {
+    Configuration conf = new Configuration();
+    final String keyName = "test";
+    final KeyProvider.KeyVersion keyVersion
+        = new KMSClientProvider.KMSKeyVersion(keyName, "v1",
+        new byte[0]);
+    // Setting total failover attempts to .
+    conf.setInt(
+        CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
+    KMSClientProvider p1 = mock(KMSClientProvider.class);
+    when(p1.getCurrentKey(Mockito.anyString()))
+        .thenThrow(new IOException("p1"))
+        .thenReturn(keyVersion);
+    KMSClientProvider p2 = mock(KMSClientProvider.class);
+    when(p2.getCurrentKey(Mockito.anyString()))
+        .thenThrow(new IOException("p2"));
+    KMSClientProvider p3 = mock(KMSClientProvider.class);
+    when(p3.getCurrentKey(Mockito.anyString()))
+        .thenThrow(new IOException("p3"));
+
+    when(p1.getKMSUrl()).thenReturn("p1");
+    when(p2.getKMSUrl()).thenReturn("p2");
+    when(p3.getKMSUrl()).thenReturn("p3");
+    LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+        new KMSClientProvider[] {p1, p2, p3}, 0, conf);
+
+    KeyProvider.KeyVersion result = kp.getCurrentKey(keyName);
+
+    assertEquals(keyVersion, result);
+    verify(kp.getProviders()[0], Mockito.times(2))
+        .getCurrentKey(Mockito.eq(keyName));
     verify(kp.getProviders()[1], Mockito.times(1))
-        .getMetadata(Mockito.eq("test3"));
+        .getCurrentKey(Mockito.eq(keyName));
     verify(kp.getProviders()[2], Mockito.times(1))
-        .getMetadata(Mockito.eq("test3"));
+        .getCurrentKey(Mockito.eq(keyName));
   }
 
   /**
@@ -717,4 +767,115 @@ public class TestLoadBalancingKMSClientProvider {
     verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName),
         Mockito.any(Options.class));
   }
+
+  /**
+   * Tests that if an idempotent operation succeeds second time after
+   * SocketTimeoutException, then the operation is successful.
+   * @throws Exception
+   */
+  @Test
+  public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionSucceeds()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(
+        CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3);
+    final List<String> keys = Arrays.asList("testKey");
+    KMSClientProvider p1 = mock(KMSClientProvider.class);
+    when(p1.getKeys())
+        .thenThrow(new SocketTimeoutException("p1"))
+        .thenReturn(keys);
+    KMSClientProvider p2 = mock(KMSClientProvider.class);
+    when(p2.getKeys()).thenThrow(new SocketTimeoutException("p2"));
+
+    when(p1.getKMSUrl()).thenReturn("p1");
+    when(p2.getKMSUrl()).thenReturn("p2");
+
+    LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+        new KMSClientProvider[] {p1, p2}, 0, conf);
+
+    List<String> result = kp.getKeys();
+    assertEquals(keys, result);
+    verify(p1, Mockito.times(2)).getKeys();
+    verify(p2, Mockito.times(1)).getKeys();
+  }
+
+  /**
+   * Tests that if a non idempotent operation fails at every attempt
+   * after SocketTimeoutException, then SocketTimeoutException is thrown.
+   * @throws Exception
+   */
+  @Test
+  public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionFails()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(
+        CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 2);
+    final String keyName = "test";
+    final String exceptionMessage = "p1 exception message";
+    KMSClientProvider p1 = mock(KMSClientProvider.class);
+    Exception originalEx = new SocketTimeoutException(exceptionMessage);
+    when(p1.getKeyVersions(Mockito.anyString()))
+        .thenThrow(originalEx);
+    KMSClientProvider p2 = mock(KMSClientProvider.class);
+    when(p2.getKeyVersions(Mockito.anyString()))
+        .thenThrow(new SocketTimeoutException("p2 exception message"));
+
+    when(p1.getKMSUrl()).thenReturn("p1");
+    when(p2.getKMSUrl()).thenReturn("p2");
+
+    LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+        new KMSClientProvider[] {p1, p2}, 0, conf);
+
+    Exception interceptedEx = intercept(SocketTimeoutException.class,
+        "SocketTimeoutException: " + exceptionMessage,
+        ()-> kp.getKeyVersions(keyName));
+    assertEquals(originalEx, interceptedEx);
+
+    verify(p1, Mockito.times(2))
+        .getKeyVersions(Mockito.eq(keyName));
+    verify(p2, Mockito.times(1))
+        .getKeyVersions(Mockito.eq(keyName));
+  }
+
+  /**
+   * Tests whether retryPolicy fails immediately on non-idempotent operations,
+   * after trying each provider once, on encountering SocketTimeoutException.
+   * @throws Exception
+   */
+  @Test
+  public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails()
+      throws Exception {
+    Configuration conf = new Configuration();
+    final String keyName = "test";
+    // Setting total failover attempts to .
+    conf.setInt(
+        CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
+    KMSClientProvider p1 = mock(KMSClientProvider.class);
+    when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class)))
+        .thenThrow(new SocketTimeoutException("p1"));
+    KMSClientProvider p2 = mock(KMSClientProvider.class);
+    when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class)))
+        .thenThrow(new SocketTimeoutException("p2"));
+    KMSClientProvider p3 = mock(KMSClientProvider.class);
+    when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class)))
+        .thenThrow(new SocketTimeoutException("p3"));
+
+    when(p1.getKMSUrl()).thenReturn("p1");
+    when(p2.getKMSUrl()).thenReturn("p2");
+    when(p3.getKMSUrl()).thenReturn("p3");
+    LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+        new KMSClientProvider[] {p1, p2, p3}, 0, conf);
+    try {
+      kp.createKey(keyName, new Options(conf));
+      fail("Should fail since all providers threw a SocketTimeoutException");
+    } catch (Exception e) {
+      assertTrue(e instanceof SocketTimeoutException);
+    }
+    verify(kp.getProviders()[0], Mockito.times(1))
+        .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+    verify(kp.getProviders()[1], Mockito.times(1))
+        .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+    verify(kp.getProviders()[2], Mockito.times(1))
+        .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org