You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2018/08/20 22:07:46 UTC
[13/14] hadoop git commit: HADOOP-15655. Enhance KMS client retry
behavior. Contributed by Kitti Nanasi.
HADOOP-15655. Enhance KMS client retry behavior. Contributed by Kitti Nanasi.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a630a27c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a630a27c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a630a27c
Branch: refs/heads/branch-3.1
Commit: a630a27c53107322a72f9b76e395c4537b09c3fc
Parents: 8656500
Author: Xiao Chen <xi...@apache.org>
Authored: Thu Aug 16 22:32:32 2018 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Mon Aug 20 14:57:51 2018 -0700
----------------------------------------------------------------------
.../key/kms/LoadBalancingKMSClientProvider.java | 43 ++---
.../kms/TestLoadBalancingKMSClientProvider.java | 181 ++++++++++++++++++-
2 files changed, 193 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a630a27c/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
index 9677b0d..e0ffdb1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java
@@ -113,8 +113,8 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
return providers;
}
- private <T> T doOp(ProviderCallable<T> op, int currPos)
- throws IOException {
+ private <T> T doOp(ProviderCallable<T> op, int currPos,
+ boolean isIdempotent) throws IOException {
if (providers.length == 0) {
throw new IOException("No providers configured !");
}
@@ -143,7 +143,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
}
RetryAction action = null;
try {
- action = retryPolicy.shouldRetry(ioe, 0, numFailovers, false);
+ action = retryPolicy.shouldRetry(ioe, 0, numFailovers, isIdempotent);
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException)e;
@@ -201,7 +201,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public Token<?>[] call(KMSClientProvider provider) throws IOException {
return provider.addDelegationTokens(renewer, credentials);
}
- }, nextIdx());
+ }, nextIdx(), false);
}
@Override
@@ -211,7 +211,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public Long call(KMSClientProvider provider) throws IOException {
return provider.renewDelegationToken(token);
}
- }, nextIdx());
+ }, nextIdx(), false);
}
@Override
@@ -222,7 +222,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
provider.cancelDelegationToken(token);
return null;
}
- }, nextIdx());
+ }, nextIdx(), false);
}
// This request is sent to all providers in the load-balancing group
@@ -275,7 +275,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
throws IOException, GeneralSecurityException {
return provider.generateEncryptedKey(encryptionKeyName);
}
- }, nextIdx());
+ }, nextIdx(), true);
} catch (WrapperException we) {
if (we.getCause() instanceof GeneralSecurityException) {
throw (GeneralSecurityException) we.getCause();
@@ -295,7 +295,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
throws IOException, GeneralSecurityException {
return provider.decryptEncryptedKey(encryptedKeyVersion);
}
- }, nextIdx());
+ }, nextIdx(), true);
} catch (WrapperException we) {
if (we.getCause() instanceof GeneralSecurityException) {
throw (GeneralSecurityException) we.getCause();
@@ -315,7 +315,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
throws IOException, GeneralSecurityException {
return provider.reencryptEncryptedKey(ekv);
}
- }, nextIdx());
+ }, nextIdx(), true);
} catch (WrapperException we) {
if (we.getCause() instanceof GeneralSecurityException) {
throw (GeneralSecurityException) we.getCause();
@@ -335,7 +335,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
provider.reencryptEncryptedKeys(ekvs);
return null;
}
- }, nextIdx());
+ }, nextIdx(), true);
} catch (WrapperException we) {
if (we.getCause() instanceof GeneralSecurityException) {
throw (GeneralSecurityException) we.getCause();
@@ -351,7 +351,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public KeyVersion call(KMSClientProvider provider) throws IOException {
return provider.getKeyVersion(versionName);
}
- }, nextIdx());
+ }, nextIdx(), true);
}
@Override
@@ -361,7 +361,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public List<String> call(KMSClientProvider provider) throws IOException {
return provider.getKeys();
}
- }, nextIdx());
+ }, nextIdx(), true);
}
@Override
@@ -371,7 +371,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public Metadata[] call(KMSClientProvider provider) throws IOException {
return provider.getKeysMetadata(names);
}
- }, nextIdx());
+ }, nextIdx(), true);
}
@Override
@@ -382,7 +382,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
throws IOException {
return provider.getKeyVersions(name);
}
- }, nextIdx());
+ }, nextIdx(), true);
}
@Override
@@ -392,8 +392,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public KeyVersion call(KMSClientProvider provider) throws IOException {
return provider.getCurrentKey(name);
}
- }, nextIdx());
+ }, nextIdx(), true);
}
+
@Override
public Metadata getMetadata(final String name) throws IOException {
return doOp(new ProviderCallable<Metadata>() {
@@ -401,7 +402,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public Metadata call(KMSClientProvider provider) throws IOException {
return provider.getMetadata(name);
}
- }, nextIdx());
+ }, nextIdx(), true);
}
@Override
@@ -412,7 +413,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public KeyVersion call(KMSClientProvider provider) throws IOException {
return provider.createKey(name, material, options);
}
- }, nextIdx());
+ }, nextIdx(), false);
}
@Override
@@ -425,7 +426,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
NoSuchAlgorithmException {
return provider.createKey(name, options);
}
- }, nextIdx());
+ }, nextIdx(), false);
} catch (WrapperException e) {
if (e.getCause() instanceof GeneralSecurityException) {
throw (NoSuchAlgorithmException) e.getCause();
@@ -442,7 +443,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
provider.deleteKey(name);
return null;
}
- }, nextIdx());
+ }, nextIdx(), false);
}
@Override
@@ -453,7 +454,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
public KeyVersion call(KMSClientProvider provider) throws IOException {
return provider.rollNewVersion(name, material);
}
- }, nextIdx());
+ }, nextIdx(), false);
invalidateCache(name);
return newVersion;
}
@@ -468,7 +469,7 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
NoSuchAlgorithmException {
return provider.rollNewVersion(name);
}
- }, nextIdx());
+ }, nextIdx(), false);
invalidateCache(name);
return newVersion;
} catch (WrapperException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a630a27c/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
index 4e7aed9..058db92 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
@@ -29,10 +29,13 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
+import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.List;
import javax.net.ssl.SSLHandshakeException;
@@ -355,24 +358,27 @@ public class TestLoadBalancingKMSClientProvider {
}
/**
- * Tests whether retryPolicy fails immediately, after trying each provider
- * once, on encountering IOException which is not SocketException.
+ * Tests whether retryPolicy fails immediately on non-idempotent operations,
+ * after trying each provider once,
+ * on encountering IOException which is not SocketException.
* @throws Exception
*/
@Test
- public void testClientRetriesWithIOException() throws Exception {
+ public void testClientRetriesNonIdempotentOpWithIOExceptionFailsImmediately()
+ throws Exception {
Configuration conf = new Configuration();
+ final String keyName = "test";
// Setting total failover attempts to .
conf.setInt(
CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
KMSClientProvider p1 = mock(KMSClientProvider.class);
- when(p1.getMetadata(Mockito.anyString()))
+ when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class)))
.thenThrow(new IOException("p1"));
KMSClientProvider p2 = mock(KMSClientProvider.class);
- when(p2.getMetadata(Mockito.anyString()))
+ when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class)))
.thenThrow(new IOException("p2"));
KMSClientProvider p3 = mock(KMSClientProvider.class);
- when(p3.getMetadata(Mockito.anyString()))
+ when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class)))
.thenThrow(new IOException("p3"));
when(p1.getKMSUrl()).thenReturn("p1");
@@ -381,17 +387,61 @@ public class TestLoadBalancingKMSClientProvider {
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
new KMSClientProvider[] {p1, p2, p3}, 0, conf);
try {
- kp.getMetadata("test3");
+ kp.createKey(keyName, new Options(conf));
fail("Should fail since all providers threw an IOException");
} catch (Exception e) {
assertTrue(e instanceof IOException);
}
verify(kp.getProviders()[0], Mockito.times(1))
- .getMetadata(Mockito.eq("test3"));
+ .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+ verify(kp.getProviders()[1], Mockito.times(1))
+ .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+ verify(kp.getProviders()[2], Mockito.times(1))
+ .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+ }
+
+ /**
+ * Tests whether retryPolicy retries on idempotent operations
+ * when encountering IOException.
+ * @throws Exception
+ */
+ @Test
+ public void testClientRetriesIdempotentOpWithIOExceptionSucceedsSecondTime()
+ throws Exception {
+ Configuration conf = new Configuration();
+ final String keyName = "test";
+ final KeyProvider.KeyVersion keyVersion
+ = new KMSClientProvider.KMSKeyVersion(keyName, "v1",
+ new byte[0]);
+ // Setting total failover attempts to .
+ conf.setInt(
+ CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
+ KMSClientProvider p1 = mock(KMSClientProvider.class);
+ when(p1.getCurrentKey(Mockito.anyString()))
+ .thenThrow(new IOException("p1"))
+ .thenReturn(keyVersion);
+ KMSClientProvider p2 = mock(KMSClientProvider.class);
+ when(p2.getCurrentKey(Mockito.anyString()))
+ .thenThrow(new IOException("p2"));
+ KMSClientProvider p3 = mock(KMSClientProvider.class);
+ when(p3.getCurrentKey(Mockito.anyString()))
+ .thenThrow(new IOException("p3"));
+
+ when(p1.getKMSUrl()).thenReturn("p1");
+ when(p2.getKMSUrl()).thenReturn("p2");
+ when(p3.getKMSUrl()).thenReturn("p3");
+ LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+ new KMSClientProvider[] {p1, p2, p3}, 0, conf);
+
+ KeyProvider.KeyVersion result = kp.getCurrentKey(keyName);
+
+ assertEquals(keyVersion, result);
+ verify(kp.getProviders()[0], Mockito.times(2))
+ .getCurrentKey(Mockito.eq(keyName));
verify(kp.getProviders()[1], Mockito.times(1))
- .getMetadata(Mockito.eq("test3"));
+ .getCurrentKey(Mockito.eq(keyName));
verify(kp.getProviders()[2], Mockito.times(1))
- .getMetadata(Mockito.eq("test3"));
+ .getCurrentKey(Mockito.eq(keyName));
}
/**
@@ -717,4 +767,115 @@ public class TestLoadBalancingKMSClientProvider {
verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName),
Mockito.any(Options.class));
}
+
+ /**
+ * Tests that if an idempotent operation succeeds second time after
+ * SocketTimeoutException, then the operation is successful.
+ * @throws Exception
+ */
+ @Test
+ public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionSucceeds()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(
+ CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 3);
+ final List<String> keys = Arrays.asList("testKey");
+ KMSClientProvider p1 = mock(KMSClientProvider.class);
+ when(p1.getKeys())
+ .thenThrow(new SocketTimeoutException("p1"))
+ .thenReturn(keys);
+ KMSClientProvider p2 = mock(KMSClientProvider.class);
+ when(p2.getKeys()).thenThrow(new SocketTimeoutException("p2"));
+
+ when(p1.getKMSUrl()).thenReturn("p1");
+ when(p2.getKMSUrl()).thenReturn("p2");
+
+ LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+ new KMSClientProvider[] {p1, p2}, 0, conf);
+
+ List<String> result = kp.getKeys();
+ assertEquals(keys, result);
+ verify(p1, Mockito.times(2)).getKeys();
+ verify(p2, Mockito.times(1)).getKeys();
+ }
+
+ /**
+ * Tests that if a non idempotent operation fails at every attempt
+ * after SocketTimeoutException, then SocketTimeoutException is thrown.
+ * @throws Exception
+ */
+ @Test
+ public void testClientRetriesIdempotentOpWithSocketTimeoutExceptionFails()
+ throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(
+ CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 2);
+ final String keyName = "test";
+ final String exceptionMessage = "p1 exception message";
+ KMSClientProvider p1 = mock(KMSClientProvider.class);
+ Exception originalEx = new SocketTimeoutException(exceptionMessage);
+ when(p1.getKeyVersions(Mockito.anyString()))
+ .thenThrow(originalEx);
+ KMSClientProvider p2 = mock(KMSClientProvider.class);
+ when(p2.getKeyVersions(Mockito.anyString()))
+ .thenThrow(new SocketTimeoutException("p2 exception message"));
+
+ when(p1.getKMSUrl()).thenReturn("p1");
+ when(p2.getKMSUrl()).thenReturn("p2");
+
+ LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+ new KMSClientProvider[] {p1, p2}, 0, conf);
+
+ Exception interceptedEx = intercept(SocketTimeoutException.class,
+ "SocketTimeoutException: " + exceptionMessage,
+ ()-> kp.getKeyVersions(keyName));
+ assertEquals(originalEx, interceptedEx);
+
+ verify(p1, Mockito.times(2))
+ .getKeyVersions(Mockito.eq(keyName));
+ verify(p2, Mockito.times(1))
+ .getKeyVersions(Mockito.eq(keyName));
+ }
+
+ /**
+ * Tests whether retryPolicy fails immediately on non-idempotent operations,
+ * after trying each provider once, on encountering SocketTimeoutException.
+ * @throws Exception
+ */
+ @Test
+ public void testClientRetriesNonIdempotentOpWithSocketTimeoutExceptionFails()
+ throws Exception {
+ Configuration conf = new Configuration();
+ final String keyName = "test";
+ // Setting total failover attempts to .
+ conf.setInt(
+ CommonConfigurationKeysPublic.KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, 10);
+ KMSClientProvider p1 = mock(KMSClientProvider.class);
+ when(p1.createKey(Mockito.anyString(), Mockito.any(Options.class)))
+ .thenThrow(new SocketTimeoutException("p1"));
+ KMSClientProvider p2 = mock(KMSClientProvider.class);
+ when(p2.createKey(Mockito.anyString(), Mockito.any(Options.class)))
+ .thenThrow(new SocketTimeoutException("p2"));
+ KMSClientProvider p3 = mock(KMSClientProvider.class);
+ when(p3.createKey(Mockito.anyString(), Mockito.any(Options.class)))
+ .thenThrow(new SocketTimeoutException("p3"));
+
+ when(p1.getKMSUrl()).thenReturn("p1");
+ when(p2.getKMSUrl()).thenReturn("p2");
+ when(p3.getKMSUrl()).thenReturn("p3");
+ LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
+ new KMSClientProvider[] {p1, p2, p3}, 0, conf);
+ try {
+ kp.createKey(keyName, new Options(conf));
+ fail("Should fail since all providers threw a SocketTimeoutException");
+ } catch (Exception e) {
+ assertTrue(e instanceof SocketTimeoutException);
+ }
+ verify(kp.getProviders()[0], Mockito.times(1))
+ .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+ verify(kp.getProviders()[1], Mockito.times(1))
+ .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+ verify(kp.getProviders()[2], Mockito.times(1))
+ .createKey(Mockito.eq(keyName), Mockito.any(Options.class));
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org